You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "ege-st (via GitHub)" <gi...@apache.org> on 2024/02/13 22:24:41 UTC

Re: [PR] Record enricher [pinot]

ege-st commented on code in PR #12243:
URL: https://github.com/apache/pinot/pull/12243#discussion_r1488632692


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/RecordEnricherPipeline.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class RecordEnricherPipeline {
+  private final List<RecordEnricher> _enrichers = new ArrayList<>();
+  private final Set<String> _columnsToExtract = new HashSet<>();
+
+  public static RecordEnricherPipeline getPassThroughPipeline() {
+    return new RecordEnricherPipeline();
+  }
+
+  public static RecordEnricherPipeline fromIngestionConfig(IngestionConfig ingestionConfig) {
+    RecordEnricherPipeline pipeline = new RecordEnricherPipeline();
+    if (null == ingestionConfig || null == ingestionConfig.getEnrichmentConfigs()) {
+      return pipeline;
+    }
+    List<EnrichmentConfig> enrichmentConfigs = ingestionConfig.getEnrichmentConfigs();
+    for (EnrichmentConfig enrichmentConfig : enrichmentConfigs) {
+      try {
+        RecordEnricher enricher = (RecordEnricher) Class.forName(enrichmentConfig.getEnricherClassName()).newInstance();
+        enricher.init(enrichmentConfig.getProperties());
+        pipeline.add(enricher);
+      } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+        throw new RuntimeException("Failed to instantiate record enricher" + enrichmentConfig.getEnricherClassName(),

Review Comment:
   ```suggestion
           throw new RuntimeException("Failed to instantiate record enricher: " + enrichmentConfig.getEnricherClassName(),
   ```
   I believe as is the class name will be concatenated with the word `enricher` which will make it hard to read these error messages.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);
+    }
+  }
+
+  private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
+    String logtype = null;
+    Object[] dictVars = null;
+    Object[] encodedVars = null;
+    if (null != value) {
+      if (!(value instanceof String)) {
+        LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
+            value.getClass().getSimpleName(), key, value);
+      } else {
+        String valueAsString = (String) value;
+        try {
+          _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
+          logtype = _clpEncodedMessage.getLogTypeAsString();
+          encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+          dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+        } catch (IOException e) {
+          LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
+              e.getMessage());
+        }

Review Comment:
   I think this would be more readable with the success path as the first branch:
   ```suggestion
         if (value instanceof String) {
           try {
             _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
             logtype = _clpEncodedMessage.getLogTypeAsString();
             encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
             dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
           } catch (IOException e) {
             LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
                 e.getMessage());
           }
         } else {
           LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
               value.getClass().getSimpleName(), key, value);
           String valueAsString = (String) value;
   
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/function/CustomFunctionEnricher.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.recordenricher.function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class CustomFunctionEnricher extends RecordEnricher {
+  private Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private List<String> _fieldsToExtract;
+
+  @Override
+  public void init(Map<String, String> enricherProps) {
+    _fieldToFunctionEvaluator = new HashMap<>();
+    _fieldsToExtract = new ArrayList<>();
+    enricherProps.forEach((k, v) -> {
+      _fieldToFunctionEvaluator.put(k, FunctionEvaluatorFactory.getExpressionEvaluator(v));
+      _fieldsToExtract.addAll(_fieldToFunctionEvaluator.get(k).getArguments());

Review Comment:
   Why not use an intermediate?
   ```suggestion
         FunctionEvaluator exprEval = FunctionEvaluatorFactory.getExpressionEvaluator(v)
         _fieldToFunctionEvaluator.put(k, exprEval);
         _fieldsToExtract.addAll(exprEval.getArguments());
   ```
   
   This way you avoid having the HashMap lookup on a value that you have created within this scope.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/function/CustomFunctionEnricher.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.recordenricher.function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class CustomFunctionEnricher extends RecordEnricher {
+  private Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private List<String> _fieldsToExtract;

Review Comment:
   I think these can be made `final` thus ensuring they are only instantiated once during ctor and never reassigned (small thing but enforcing expected behavior with the compiler is a good habit to have).



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -157,17 +159,20 @@ public RecordReader getRecordReader() {
   public void init(SegmentGeneratorConfig config, RecordReader recordReader)
       throws Exception {
     SegmentCreationDataSource dataSource = new RecordReaderSegmentCreationDataSource(recordReader);
-    init(config, dataSource, new TransformPipeline(config.getTableConfig(), config.getSchema()));
+    init(config, dataSource, RecordEnricherPipeline.fromTableConfig(config.getTableConfig()),
+        new TransformPipeline(config.getTableConfig(), config.getSchema()));

Review Comment:
   Does the `RecordEnricherPipeline` evaluate before the `TransformPipeline` or after? If it's after then we should have it occur after in the ctor parameter list, so that the APIs help re-enforce the logical behavior.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);

Review Comment:
   What happens if one of the fields contains an illegal character (e.g., `fo"o,ba$r`)? Are there illegal characters for field names? Or we have a field list like: `foo,,,bar,,`?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;

Review Comment:
   I think both of these can be made `final`, which will help enforce that they should only be assigned an instance once each (when the ctor is called).



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);

Review Comment:
   One subtle consequence of this pattern we use of modifying the input `GenericRow` is that if we have a fault during the modification then we're left with a partially updated `GenericRow`. Hopefully, we always drop the write when a pipeline fails otherwise, we could get very difficult to debug 'broken' data written to a segment.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/SegmentIndexCreationDriverImpl.java:
##########
@@ -178,10 +183,12 @@ public void init(SegmentGeneratorConfig config, SegmentCreationDataSource dataSo
     if (config.isFailOnEmptySegment()) {
       Preconditions.checkState(_recordReader.hasNext(), "No record in data source");
     }
+    _recordEnricherPipeline = enricherPipeline;
     _transformPipeline = transformPipeline;
     // Use the same transform pipeline if the data source is backed by a record reader
     if (dataSource instanceof RecordReaderSegmentCreationDataSource) {
       ((RecordReaderSegmentCreationDataSource) dataSource).setTransformPipeline(transformPipeline);
+      ((RecordReaderSegmentCreationDataSource) dataSource).setRecordEnricherPipeline(enricherPipeline);

Review Comment:
   Likewise, if `RecordEnricherPipeline` is executed before `TransformPipeline` then lets have it consistently occur before `TransformPipeline`.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }

Review Comment:
   It's usually more readable to have the success path be the first branch and the failure path be the else branch.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/EnrichmentConfig.java:
##########
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table.ingestion;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyDescription;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.spi.config.BaseJsonConfig;
+
+
+public class EnrichmentConfig extends BaseJsonConfig {
+  @JsonPropertyDescription("Enricher class name")
+  private final String _enricherClassName;
+
+  @JsonPropertyDescription("Enricher properties")
+  private final Map<String, String> _properties;

Review Comment:
   What are some examples of properties we might have?  The only one I saw in the code was `fieldsForClpEncoding` for `CLPEncodingEnricher`. Are there others? 
   
   One concern I have is how hard it will be to determine what valid properties are and what required properties are from the Config Schema.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);
+    }
+  }
+
+  private void enrichWithClpEncodedFields(String key, Object value, GenericRow to) {
+    String logtype = null;
+    Object[] dictVars = null;
+    Object[] encodedVars = null;
+    if (null != value) {
+      if (!(value instanceof String)) {
+        LOGGER.error("Can't encode value of type {} with CLP. name: '{}', value: '{}'",
+            value.getClass().getSimpleName(), key, value);
+      } else {
+        String valueAsString = (String) value;
+        try {
+          _clpMessageEncoder.encodeMessage(valueAsString, _clpEncodedMessage);
+          logtype = _clpEncodedMessage.getLogTypeAsString();
+          encodedVars = _clpEncodedMessage.getEncodedVarsAsBoxedLongs();
+          dictVars = _clpEncodedMessage.getDictionaryVarsAsStrings();
+        } catch (IOException e) {
+          LOGGER.error("Can't encode field with CLP. name: '{}', value: '{}', error: {}", key, valueAsString,
+              e.getMessage());
+        }
+      }
+    }
+
+    to.putValue(key + ClpRewriter.LOGTYPE_COLUMN_SUFFIX, logtype);
+    to.putValue(key + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX, dictVars);
+    to.putValue(key + ClpRewriter.ENCODED_VARS_COLUMN_SUFFIX, encodedVars);

Review Comment:
   These will be added as columns within the table?  What happens if there's a collision (e.g., a user gives a schema name that is the same as one of these dynamically constructed names)? Is that possible?  If not, let me know why so I understand Pinot better.



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/function/CustomFunctionEnricher.java:
##########
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.segment.local.recordenricher.function;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.segment.local.function.FunctionEvaluator;
+import org.apache.pinot.segment.local.function.FunctionEvaluatorFactory;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+public class CustomFunctionEnricher extends RecordEnricher {
+  private Map<String, FunctionEvaluator> _fieldToFunctionEvaluator;
+  private List<String> _fieldsToExtract;
+
+  @Override
+  public void init(Map<String, String> enricherProps) {
+    _fieldToFunctionEvaluator = new HashMap<>();
+    _fieldsToExtract = new ArrayList<>();
+    enricherProps.forEach((k, v) -> {
+      _fieldToFunctionEvaluator.put(k, FunctionEvaluatorFactory.getExpressionEvaluator(v));
+      _fieldsToExtract.addAll(_fieldToFunctionEvaluator.get(k).getArguments());
+    });
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fieldsToExtract;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    _fieldToFunctionEvaluator.forEach((field, evaluator) -> {
+      record.putValue(field, evaluator.evaluate(record));
+    });

Review Comment:
   Is is possible for an exception to get thrown by any of the `evaluator`s? If so, should we catch here (as was done in CLP) and log a message before propagating up?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordenricher/clp/CLPEncodingEnricher.java:
##########
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.recordenricher.clp;
+
+import com.yscope.clp.compressorfrontend.BuiltInVariableHandlingRuleVersions;
+import com.yscope.clp.compressorfrontend.EncodedMessage;
+import com.yscope.clp.compressorfrontend.MessageEncoder;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.segment.local.recordenricher.RecordEnricher;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.sql.parsers.rewriter.ClpRewriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CLPEncodingEnricher extends RecordEnricher {
+  public static final String FIELDS_FOR_CLP_ENCODING_CONFIG_KEY = "fieldsForClpEncoding";
+  public static final String FIELDS_FOR_CLP_ENCODING_SEPARATOR = ",";
+  private static final Logger LOGGER = LoggerFactory.getLogger(CLPEncodingEnricher.class);
+
+  private List<String> _fields;
+  private EncodedMessage _clpEncodedMessage;
+  private MessageEncoder _clpMessageEncoder;
+
+  @Override
+  public void init(Map<String, String> enricherProperties) {
+    String concatenatedFieldNames = enricherProperties.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    if (StringUtils.isEmpty(concatenatedFieldNames)) {
+      throw new IllegalArgumentException("Missing required property: " + FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
+    } else {
+      _fields = List.of(concatenatedFieldNames.split(FIELDS_FOR_CLP_ENCODING_SEPARATOR));
+    }
+
+    _clpEncodedMessage = new EncodedMessage();
+    _clpMessageEncoder = new MessageEncoder(BuiltInVariableHandlingRuleVersions.VariablesSchemaV2,
+        BuiltInVariableHandlingRuleVersions.VariableEncodingMethodsV1);
+  }
+
+  @Override
+  public List<String> getInputColumns() {
+    return _fields;
+  }
+
+  @Override
+  public void enrich(GenericRow record) {
+    try {
+      for (String field : _fields) {
+        Object value = record.getValue(field);
+        if (value != null) {
+          enrichWithClpEncodedFields(field, value, record);
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Failed to enrich record: {}", record);

Review Comment:
   Obviously, outside the scope of this PR, but just occurred to me and wanted to write it down to get feedback.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org