You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sa...@apache.org on 2024/02/27 04:44:29 UTC
(pinot) branch master updated: Record enricher (#12243)
This is an automated email from the ASF dual-hosted git repository.
saurabhd336 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 07daa7bbaa Record enricher (#12243)
07daa7bbaa is described below
commit 07daa7bbaa92c882758f086d4d30dad96c273473
Author: Saurabh Dubey <sa...@gmail.com>
AuthorDate: Tue Feb 27 10:14:23 2024 +0530
Record enricher (#12243)
* Enricher interface
* Support in realtime ingestion
* Fix offline segment builder
* Add enricher to mapper
* Add columnsToExtract config
* Complete enricher pipeline usage
* Enrichment configs
* Add transform function based enrichment
* Change package name
* Change package name
* Review comments
* Add table config validation
* Review comments
* Change to fieldToFunctionMap
---------
Co-authored-by: Saurabh Dubey <sa...@saurabhs-macbook-pro-1.tail8a064.ts.net>
Co-authored-by: Saurabh Dubey <sa...@Saurabhs-MacBook-Pro.local>
---
.../connector/flink/sink/FlinkSegmentWriter.java | 4 +
.../realtime/RealtimeSegmentDataManager.java | 11 +++
.../framework/SegmentProcessorFramework.java | 2 +
.../segment/processing/mapper/SegmentMapper.java | 4 +
.../apache/pinot/queries/TransformQueriesTest.java | 2 +-
.../record/enricher/clp/CLPEncodingEnricher.java | 100 +++++++++++++++++++++
.../enricher/clp/CLPEncodingEnricherFactory.java | 51 +++++++++++
.../record/enricher/clp/ClpEnricherConfig.java | 40 +++++++++
.../enricher/function/CustomFunctionEnricher.java | 66 ++++++++++++++
.../function/CustomFunctionEnricherConfig.java | 40 +++++++++
.../function/CustomFunctionEnricherFactory.java | 61 +++++++++++++
.../converter/RealtimeSegmentConverter.java | 4 +-
.../RecordReaderSegmentCreationDataSource.java | 9 ++
.../impl/SegmentIndexCreationDriverImpl.java | 12 ++-
.../pinot/segment/local/utils/IngestionUtils.java | 8 +-
.../segment/local/utils/TableConfigUtils.java | 13 +++
.../spi/annotations/RecordEnricherFactory.java | 30 +++++++
.../config/table/ingestion/EnrichmentConfig.java | 49 ++++++++++
.../config/table/ingestion/IngestionConfig.java | 14 +++
.../pinot/spi/recordenricher/RecordEnricher.java | 40 +++++++++
.../spi/recordenricher/RecordEnricherConfig.java | 23 +++++
.../spi/recordenricher/RecordEnricherFactory.java | 29 ++++++
.../spi/recordenricher/RecordEnricherPipeline.java | 75 ++++++++++++++++
.../spi/recordenricher/RecordEnricherRegistry.java | 62 +++++++++++++
.../RecordEnricherValidationConfig.java | 35 ++++++++
25 files changed, 777 insertions(+), 7 deletions(-)
diff --git a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
index 8815bbf37a..e9bd29701b 100644
--- a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
+++ b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/sink/FlinkSegmentWriter.java
@@ -53,6 +53,7 @@ import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.Constants;
import org.apache.pinot.spi.ingestion.segment.writer.SegmentWriter;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +78,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
private String _outputDirURI;
private Schema _schema;
private Set<String> _fieldsToRead;
+ private RecordEnricherPipeline _recordEnricherPipeline;
private RecordTransformer _recordTransformer;
private File _stagingDir;
@@ -137,6 +139,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
_schema = schema;
_fieldsToRead = _schema.getColumnNames();
+ _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(_tableConfig);
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
_avroSchema = SegmentProcessorAvroUtils.convertPinotSchemaToAvroSchema(_schema);
_reusableRecord = new GenericData.Record(_avroSchema);
@@ -172,6 +175,7 @@ public class FlinkSegmentWriter implements SegmentWriter {
public void collect(GenericRow row)
throws IOException {
long startTime = System.currentTimeMillis();
+ _recordEnricherPipeline.run(row);
GenericRow transform = _recordTransformer.transform(row);
SegmentProcessorAvroUtils.convertGenericRowToAvroRecord(transform, _reusableRecord, _fieldsToRead);
_rowCount++;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d64e85fada..5dd8fc025a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -75,6 +75,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.metrics.PinotMeter;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.stream.ConsumerPartitionState;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
@@ -272,6 +273,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
private final int _partitionGroupId;
private final PartitionGroupConsumptionStatus _partitionGroupConsumptionStatus;
final String _clientId;
+ private final RecordEnricherPipeline _recordEnricherPipeline;
private final TransformPipeline _transformPipeline;
private PartitionGroupConsumer _partitionGroupConsumer = null;
private StreamMetadataProvider _partitionMetadataProvider = null;
@@ -575,6 +577,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
_numRowsErrored++;
} else {
try {
+ _recordEnricherPipeline.run(decodedRow.getResult());
_transformPipeline.processRow(decodedRow.getResult(), reusedResult);
} catch (Exception e) {
_numRowsErrored++;
@@ -1480,6 +1483,14 @@ public class RealtimeSegmentDataManager extends SegmentDataManager {
new SegmentErrorInfo(now(), "Failed to initialize the StreamMessageDecoder", e));
throw e;
}
+
+ try {
+ _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
+ } catch (Exception e) {
+ _realtimeTableDataManager.addSegmentError(_segmentNameStr,
+ new SegmentErrorInfo(now(), "Failed to initialize the RecordEnricherPipeline", e));
+ throw e;
+ }
_transformPipeline = new TransformPipeline(tableConfig, schema);
// Acquire semaphore to create stream consumers
try {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 868583d0d3..def4f75b29 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -43,6 +43,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -289,6 +290,7 @@ public class SegmentProcessorFramework {
GenericRowFileRecordReader recordReaderForRange = recordReader.getRecordReaderForRange(startRowId, endRowId);
SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
+ RecordEnricherPipeline.getPassThroughPipeline(),
TransformPipeline.getPassThroughPipeline());
driver.build();
outputSegmentDirs.add(driver.getOutputDirectory());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 407cbd4dcd..77b651d943 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFileConfig;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,6 +69,7 @@ public class SegmentMapper {
private final List<FieldSpec> _fieldSpecs;
private final boolean _includeNullFields;
private final int _numSortFields;
+ private final RecordEnricherPipeline _recordEnricherPipeline;
private final CompositeTransformer _recordTransformer;
private final TimeHandler _timeHandler;
private final Partitioner[] _partitioners;
@@ -92,6 +94,7 @@ public class SegmentMapper {
_fieldSpecs = pair.getLeft();
_numSortFields = pair.getRight();
_includeNullFields = tableConfig.getIndexingConfig().isNullHandlingEnabled();
+ _recordEnricherPipeline = RecordEnricherPipeline.fromTableConfig(tableConfig);
_recordTransformer = CompositeTransformer.composeAllTransformers(_customRecordTransformers, tableConfig, schema);
_timeHandler = TimeHandlerFactory.getTimeHandler(processorConfig);
List<PartitionerConfig> partitionerConfigs = processorConfig.getPartitionerConfigs();
@@ -166,6 +169,7 @@ public class SegmentMapper {
observer.accept(String.format("Doing map phase on data from RecordReader (%d out of %d)", count, totalCount));
while (recordReader.hasNext() && (_adaptiveSizeBasedWriter.canWrite())) {
reuse = recordReader.next(reuse);
+ _recordEnricherPipeline.run(reuse);
// TODO: Add ComplexTypeTransformer here. Currently it is not idempotent so cannot add it
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
index 572080e44e..1f04d16d3b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/TransformQueriesTest.java
@@ -132,7 +132,7 @@ public class TransformQueriesTest extends BaseQueriesTest {
TableConfig tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME)
- .setIngestionConfig(new IngestionConfig(null, null, null,
+ .setIngestionConfig(new IngestionConfig(null, null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
null, null, null))
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java
new file mode 100644
index 0000000000..790015d8af
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.clp;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Enriches the record with CLP encoded fields.
+ * For a column 'x', it adds three new columns to the record:
+ * 1. 'x_logtype' - The logtype of the encoded message
+ * 2. 'x_dictVars' - The dictionary variables of the encoded message
+ * 3. 'x_encodedVars' - The encoded variables of the encoded message
+ */
+public class CLPEncodingEnricher implements RecordEnricher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+ private final ClpEnricherConfig _config;
+ private final EncodedMessage _clpEncodedMessage;
+ private final MessageEncoder _clpMessageEncoder;
+
+ public CLPEncodingEnricher(JsonNode enricherProperties) throws IOException {
+ _config = JsonUtils.jsonNodeToObject(enricherProperties, ClpEnricherConfig.class);
+ _clpEncodedMessage = new EncodedMessage();
+ _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+ BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+ }
+
+ @Override
+ public List<String> getInputColumns() {
+ return _config.getFields();
+ }
+
+ @Override
+ public void enrich(GenericRow record) {
+ try {
+ for (String field : _config.getFields()) {
+ Object value = record.getValue(field);
+ if (value != null) {
+ enrichWithClpEncodedFields(field, value, record);
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to enrich record: {}", record);
+ }
+ }
+
+ private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
+ String logtype = null;
+ Object[] dictVars = null;
+ Object[] encodedVars = null;
+ if (null != value) {
+ if (value instanceof String) {
+ String valueAsString = (String) value;
+ try {
+ _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
+ logtype = _clpEncodedMessage.getLogTypeAsString();
+ encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+ dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+ } catch (IOException e) {
+ LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
+ e.getMessage());
+ }
+ } else {
+ LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
+ value.getClass().getSimpleName(), key, value);
+ }
+ }
+
+ to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
+ to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
+ to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java
new file mode 100644
index 0000000000..cacc4fdbc9
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/CLPEncodingEnricherFactory.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.clp;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
+import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+@AutoService(RecordEnricherFactory.class)
+public class CLPEncodingEnricherFactory implements RecordEnricherFactory {
+ private static final String ENRICHER_TYPE = "clpEnricher";
+ @Override
+ public String getEnricherType() {
+ return ENRICHER_TYPE;
+ }
+
+ @Override
+ public RecordEnricher createEnricher(JsonNode enricherProps)
+ throws IOException {
+ return new CLPEncodingEnricher(enricherProps);
+ }
+
+ @Override
+ public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) {
+ try {
+ ClpEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, ClpEnricherConfig.class);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to parse clp enricher config", e);
+ }
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java
new file mode 100644
index 0000000000..93439602ec
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/clp/ClpEnricherConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.clp;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+/**
+ * Configuration for the CLP enricher.
+ */
+public class ClpEnricherConfig {
+ private final List<String> _fields;
+
+ @JsonCreator
+ public ClpEnricherConfig(@JsonProperty("fields") List<String> fields) {
+ _fields = fields;
+ }
+
+ public List<String> getFields() {
+ return _fields;
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java
new file mode 100644
index 0000000000..92cf565220
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricher.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.plugin.record.enricher.function;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+/**
+ * Enriches the record with custom functions.
+ */
+public class CustomFunctionEnricher implements RecordEnricher {
+ private final Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+ private final List<String> _fieldsToExtract;
+
+ public CustomFunctionEnricher(JsonNode enricherProps) throws IOException {
+ CustomFunctionEnricherConfig config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class);
+ _fieldToFunctionEvaluator = new LinkedHashMap<>();
+ _fieldsToExtract = new ArrayList<>();
+ for (Map.Entry<String, String> entry : config.getFieldToFunctionMap().entrySet()) {
+ String column = entry.getKey();
+ String function = entry.getValue();
+ FunctionEvaluator functionEvaluator = FunctionEvaluatorFactory.getExpressionEvaluator(function);
+ _fieldToFunctionEvaluator.put(column, functionEvaluator);
+ _fieldsToExtract.addAll(functionEvaluator.getArguments());
+ }
+ }
+
+ @Override
+ public List<String> getInputColumns() {
+ return _fieldsToExtract;
+ }
+
+ @Override
+ public void enrich(GenericRow record) {
+ _fieldToFunctionEvaluator.forEach((field, evaluator) -> {
+ record.putValue(field, evaluator.evaluate(record));
+ });
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java
new file mode 100644
index 0000000000..cc4270f601
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherConfig.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.function;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.LinkedHashMap;
+
+/**
+ * Configuration for the custom function enricher.
+ */
+public class CustomFunctionEnricherConfig {
+ private final LinkedHashMap<String, String> _fieldToFunctionMap;
+
+ @JsonCreator
+ public CustomFunctionEnricherConfig(
+ @JsonProperty("fieldToFunctionMap") LinkedHashMap<String, String> columnTofunctionMap) {
+ _fieldToFunctionMap = columnTofunctionMap;
+ }
+
+ public LinkedHashMap<String, String> getFieldToFunctionMap() {
+ return _fieldToFunctionMap;
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java
new file mode 100644
index 0000000000..f773081903
--- /dev/null
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/plugin/record/enricher/function/CustomFunctionEnricherFactory.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.record.enricher.function;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.spi.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.recordenricher.RecordEnricherFactory;
+import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+@AutoService(RecordEnricherFactory.class)
+public class CustomFunctionEnricherFactory implements RecordEnricherFactory {
+ private static final String TYPE = "generateColumn";
+ @Override
+ public String getEnricherType() {
+ return TYPE;
+ }
+
+ @Override
+ public RecordEnricher createEnricher(JsonNode enricherProps)
+ throws IOException {
+ return new CustomFunctionEnricher(enricherProps);
+ }
+
+ @Override
+ public void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig) {
+ CustomFunctionEnricherConfig config;
+ try {
+ config = JsonUtils.jsonNodeToObject(enricherProps, CustomFunctionEnricherConfig.class);
+ if (!validationConfig.isGroovyDisabled()) {
+ return;
+ }
+ for (String function : config.getFieldToFunctionMap().values()) {
+ if (FunctionEvaluatorFactory.isGroovyExpression(function)) {
+ throw new IllegalArgumentException("Groovy expression is not allowed for enrichment");
+ }
+ }
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to parse custom function enricher config", e);
+ }
+ }
+}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
index a6e2d067b6..ffb9bfc23f 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java
@@ -42,6 +42,7 @@ import org.apache.pinot.spi.config.table.SegmentZKPropsConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
public class RealtimeSegmentConverter {
@@ -125,7 +126,8 @@ public class RealtimeSegmentConverter {
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
RealtimeSegmentSegmentCreationDataSource dataSource =
new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader);
- driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline());
+ driver.init(genConfig, dataSource, RecordEnricherPipeline.getPassThroughPipeline(),
+ TransformPipeline.getPassThroughPipeline());
if (!_enableColumnMajor) {
driver.build();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
index 411c040cdd..219a207d94 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/RecordReaderSegmentCreationDataSource.java
@@ -25,6 +25,7 @@ import org.apache.pinot.segment.spi.creator.SegmentPreIndexStatsCollector;
import org.apache.pinot.segment.spi.creator.StatsCollectorConfig;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,12 +39,17 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
private static final Logger LOGGER = LoggerFactory.getLogger(RecordReaderSegmentCreationDataSource.class);
private final RecordReader _recordReader;
+ private RecordEnricherPipeline _recordEnricherPipeline;
private TransformPipeline _transformPipeline;
public RecordReaderSegmentCreationDataSource(RecordReader recordReader) {
_recordReader = recordReader;
}
+ public void setRecordEnricherPipeline(RecordEnricherPipeline recordEnricherPipeline) {
+ _recordEnricherPipeline = recordEnricherPipeline;
+ }
+
public void setTransformPipeline(TransformPipeline transformPipeline) {
_transformPipeline = transformPipeline;
}
@@ -51,6 +57,8 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
@Override
public SegmentPreIndexStatsCollector gatherStats(StatsCollectorConfig statsCollectorConfig) {
try {
+ RecordEnricherPipeline recordEnricherPipeline = _recordEnricherPipeline != null ? _recordEnricherPipeline
+ : RecordEnricherPipeline.fromTableConfig(statsCollectorConfig.getTableConfig());
TransformPipeline transformPipeline = _transformPipeline != null ? _transformPipeline
: new TransformPipeline(statsCollectorConfig.getTableConfig(), statsCollectorConfig.getSchema());
@@ -64,6 +72,7 @@ public class RecordReaderSegmentCreationDataSource implements SegmentCreationDat
reuse.clear();
reuse = _recordReader.next(reuse);
+ recordEnricherPipeline.run(reuse);
transformPipeline.processRow(reuse, reusedResult);
for (GenericRow row : reusedResult.getTransformedRows()) {
collector.collectRow(row);
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 129bc35ccf..e99d89c8b4 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -79,6 +79,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.ReadMode;
import org.slf4j.Logger;
@@ -100,6 +101,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
private SegmentCreator _indexCreator;
private SegmentIndexCreationInfo _segmentIndexCreationInfo;
private Schema _dataSchema;
+ private RecordEnricherPipeline _recordEnricherPipeline;
private TransformPipeline _transformPipeline;
private IngestionSchemaValidator _ingestionSchemaValidator;
private int _totalDocs = 0;
@@ -157,17 +159,20 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
public void init(SegmentGeneratorConfig config, RecordReader recordReader)
throws Exception {
SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
- init(config, dataSource, new TransformPipeline(config.getTableConfig(), config.getSchema()));
+ init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
+ new TransformPipeline(config.getTableConfig(), config.getSchema()));
}
@Deprecated
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
RecordTransformer recordTransformer, @Nullable ComplexTypeTransformer complexTypeTransformer)
throws Exception {
- init(config, dataSource, new TransformPipeline(recordTransformer, complexTypeTransformer));
+ init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
+ new TransformPipeline(recordTransformer, complexTypeTransformer));
}
public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSource,
+ RecordEnricherPipeline enricherPipeline,
TransformPipeline transformPipeline)
throws Exception {
_config = config;
@@ -178,9 +183,11 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
if (config.isFailOnEmptySegment()) {
Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
}
+ _recordEnricherPipeline = enricherPipeline;
_transformPipeline = transformPipeline;
// Use the same transform pipeline if the data source is backed by a record reader
if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
+ ((RecordReaderSegmentCreationDataSource) dataSource).setRecordEnricherPipeline(enricherPipeline);
((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
}
@@ -244,6 +251,7 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
// Should not be needed anymore.
// Add row to indexes
+ _recordEnricherPipeline.run(decodedRow);
_transformPipeline.processRow(decodedRow, reusedResult);
recordReadStopTime = System.nanoTime();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
index 315bd5becf..203d7d930a 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/IngestionUtils.java
@@ -65,6 +65,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.ingestion.batch.spec.TableSpec;
+import org.apache.pinot.spi.recordenricher.RecordEnricherPipeline;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
@@ -379,15 +380,16 @@ public final class IngestionUtils {
expressionContext.getColumns(fields);
}
}
+
+ fields.addAll(RecordEnricherPipeline.fromIngestionConfig(ingestionConfig).getColumnsToExtract());
List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
if (transformConfigs != null) {
for (TransformConfig transformConfig : transformConfigs) {
FunctionEvaluator expressionEvaluator =
FunctionEvaluatorFactory.getExpressionEvaluator(transformConfig.getTransformFunction());
fields.addAll(expressionEvaluator.getArguments());
- fields.add(transformConfig
- .getColumnName()); // add the column itself too, so that if it is already transformed, we won't
- // transform again
+ // add the column itself too, so that if it is already transformed, we won't transform again
+ fields.add(transformConfig.getColumnName());
}
}
ComplexTypeConfig complexTypeConfig = ingestionConfig.getComplexTypeConfig();
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 83c890d5f9..43fd69488b 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -73,6 +73,7 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.SchemaConformingTransformerConfig;
@@ -82,6 +83,8 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.recordenricher.RecordEnricherRegistry;
+import org.apache.pinot.spi.recordenricher.RecordEnricherValidationConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -505,6 +508,16 @@ public final class TableConfigUtils {
});
}
+ // Enrichment configs
+ List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
+ if (enrichmentConfigs != null) {
+ for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
+ RecordEnricherRegistry.validateEnrichmentConfig(enrichmentConfig,
+ new RecordEnricherValidationConfig(disableGroovy));
+ }
+ }
+
+
// Transform configs
List<TransformConfig> transformConfigs = ingestionConfig.getTransformConfigs();
if (transformConfigs != null) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java
new file mode 100644
index 0000000000..5e58c3a3fd
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/annotations/RecordEnricherFactory.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.annotations;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface RecordEnricherFactory {
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java
new file mode 100644
index 0000000000..9b295a78e4
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class EnrichmentConfig extends BaseJsonConfig {
+ @JsonPropertyDescription("Enricher type")
+ private final String _enricherType;
+
+ @JsonPropertyDescription("Enricher properties")
+ private final JsonNode _properties;
+
+ @JsonCreator
+ public EnrichmentConfig(@JsonProperty("enricherType") String enricherType,
+ @JsonProperty("properties") JsonNode properties) {
+ _enricherType = enricherType;
+ _properties = properties;
+ }
+
+ public String getEnricherType() {
+ return _enricherType;
+ }
+
+ public JsonNode getProperties() {
+ return _properties;
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 86ff9cec6f..5af9cdcc50 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -39,6 +39,9 @@ public class IngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Config related to filtering records during ingestion")
private FilterConfig _filterConfig;
+ @JsonPropertyDescription("Config related to enriching records during ingestion")
+ private List<EnrichmentConfig> _enrichmentConfigs;
+
@JsonPropertyDescription("Configs related to record transformation functions applied during ingestion")
private List<TransformConfig> _transformConfigs;
@@ -63,12 +66,14 @@ public class IngestionConfig extends BaseJsonConfig {
@Deprecated
public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
@Nullable StreamIngestionConfig streamIngestionConfig, @Nullable FilterConfig filterConfig,
+ @Nullable List<EnrichmentConfig> enrichmentConfigs,
@Nullable List<TransformConfig> transformConfigs, @Nullable ComplexTypeConfig complexTypeConfig,
@Nullable SchemaConformingTransformerConfig schemaConformingTransformerConfig,
@Nullable List<AggregationConfig> aggregationConfigs) {
_batchIngestionConfig = batchIngestionConfig;
_streamIngestionConfig = streamIngestionConfig;
_filterConfig = filterConfig;
+ _enrichmentConfigs = enrichmentConfigs;
_transformConfigs = transformConfigs;
_complexTypeConfig = complexTypeConfig;
_schemaConformingTransformerConfig = schemaConformingTransformerConfig;
@@ -93,6 +98,11 @@ public class IngestionConfig extends BaseJsonConfig {
return _filterConfig;
}
+ @Nullable
+ public List<EnrichmentConfig> getEnrichmentConfigs() {
+ return _enrichmentConfigs;
+ }
+
@Nullable
public List<TransformConfig> getTransformConfigs() {
return _transformConfigs;
@@ -137,6 +147,10 @@ public class IngestionConfig extends BaseJsonConfig {
_filterConfig = filterConfig;
}
+ public void setEnrichmentConfigs(List<EnrichmentConfig> enrichmentConfigs) {
+ _enrichmentConfigs = enrichmentConfigs;
+ }
+
public void setTransformConfigs(List<TransformConfig> transformConfigs) {
_transformConfigs = transformConfigs;
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java
new file mode 100644
index 0000000000..8e48ed7102
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricher.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import java.util.List;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Interface for enriching records.
+ * If a column with the same name as the input column already exists in the record, it will be overwritten.
+ */
+public interface RecordEnricher {
+ /**
+ * Returns the list of input columns required for enriching the record.
+ * This is used to make sure the required input fields are extracted.
+ */
+ List<String> getInputColumns();
+
+ /**
+ * Enriches the given record, by adding new columns to the same record.
+ */
+ void enrich(GenericRow record);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java
new file mode 100644
index 0000000000..adac697bdb
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherConfig.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+public interface RecordEnricherConfig {
+ void parse();
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java
new file mode 100644
index 0000000000..4b55d04260
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherFactory.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.IOException;
+
+
+public interface RecordEnricherFactory {
+ String getEnricherType();
+ RecordEnricher createEnricher(JsonNode enricherProps) throws IOException;
+ void validateEnrichmentConfig(JsonNode enricherProps, RecordEnricherValidationConfig validationConfig);
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java
new file mode 100644
index 0000000000..5a50d685ca
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherPipeline.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class RecordEnricherPipeline {
+ private final List<RecordEnricher> _enrichers = new ArrayList<>();
+ private final Set<String> _columnsToExtract = new HashSet<>();
+
+ public static RecordEnricherPipeline getPassThroughPipeline() {
+ return new RecordEnricherPipeline();
+ }
+
+ public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) {
+ RecordEnricherPipeline pipeline = new RecordEnricherPipeline();
+ if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) {
+ return pipeline;
+ }
+ List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
+ for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
+ try {
+ RecordEnricher enricher = RecordEnricherRegistry.createRecordEnricher(enrichmentConfig);
+ pipeline.add(enricher);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to instantiate record enricher " + enrichmentConfig.getEnricherType(), e);
+ }
+ }
+ return pipeline;
+ }
+
+ public static RecordEnricherPipeline fromTableConfig(TableConfig tableConfig) {
+ return fromIngestionConfig(tableConfig.getIngestionConfig());
+ }
+
+ public Set<String> getColumnsToExtract() {
+ return _columnsToExtract;
+ }
+
+ public void add(RecordEnricher enricher) {
+ _enrichers.add(enricher);
+ _columnsToExtract.addAll(enricher.getInputColumns());
+ }
+
+ public void run(GenericRow record) {
+ for (RecordEnricher enricher : _enrichers) {
+ enricher.enrich(record);
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java
new file mode 100644
index 0000000000..59c1e6fb34
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherRegistry.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.recordenricher;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ServiceLoader;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class RecordEnricherRegistry {
+ private static final Logger LOGGER = LoggerFactory.getLogger(RecordEnricherRegistry.class);
+ private static final Map<String, RecordEnricherFactory> RECORD_ENRICHER_FACTORY_MAP = new HashMap<>();
+
+ private RecordEnricherRegistry() {
+ }
+
+ public static void validateEnrichmentConfig(EnrichmentConfig enrichmentConfig,
+ RecordEnricherValidationConfig config) {
+ if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
+ throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
+ }
+
+ RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType())
+ .validateEnrichmentConfig(enrichmentConfig.getProperties(), config);
+ }
+
+ public static RecordEnricher createRecordEnricher(EnrichmentConfig enrichmentConfig)
+ throws IOException {
+ if (!RECORD_ENRICHER_FACTORY_MAP.containsKey(enrichmentConfig.getEnricherType())) {
+ throw new IllegalArgumentException("No record enricher found for type: " + enrichmentConfig.getEnricherType());
+ }
+ return RECORD_ENRICHER_FACTORY_MAP.get(enrichmentConfig.getEnricherType())
+ .createEnricher(enrichmentConfig.getProperties());
+ }
+
+ static {
+ for (RecordEnricherFactory recordEnricherFactory : ServiceLoader.load(RecordEnricherFactory.class)) {
+ LOGGER.info("Registered record enricher factory type: {}", recordEnricherFactory.getEnricherType());
+ RECORD_ENRICHER_FACTORY_MAP.put(recordEnricherFactory.getEnricherType(), recordEnricherFactory);
+ }
+ }
+}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java
new file mode 100644
index 0000000000..86fb74155e
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/recordenricher/RecordEnricherValidationConfig.java
@@ -0,0 +1,35 @@
+package org.apache.pinot.spi.recordenricher;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Interface for cluster constrains, which can be used to validate the record enricher configs
+ */
+public class RecordEnricherValidationConfig {
+ private final boolean _groovyDisabled;
+
+ public RecordEnricherValidationConfig(boolean groovyDisabled) {
+ _groovyDisabled = groovyDisabled;
+ }
+
+ public boolean isGroovyDisabled() {
+ return _groovyDisabled;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org