You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "rohityadav1993 (via GitHub)" <gi...@apache.org> on 2023/11/09 16:20:19 UTC

[PR] Custom merger [pinot]

rohityadav1993 opened a new pull request, #11983:
URL: https://github.com/apache/pinot/pull/11983

   Instructions:
   1. The PR has to be tagged with at least one of the following labels (*):
      1. `feature`
      2. `bugfix`
      3. `performance`
      4. `ui`
      5. `backward-incompat`
      6. `release-notes` (**)
   2. Remove these instructions before publishing the PR.
    
   (*) Other labels to consider:
   - `testing`
   - `dependencies`
   - `docker`
   - `kubernetes`
   - `observability`
   - `security`
   - `code-style`
   - `extension-point`
   - `refactor`
   - `cleanup`
   
   (**) Use `release-notes` label for scenarios like:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1550949947


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -32,76 +31,79 @@
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
-    for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
-        } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();

Review Comment:
   (minor) Clear the result on the caller side



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
     PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
-      Preconditions.checkArgument(partialUpsertStrategies != null,
+      String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
       partialUpsertHandler =
-          new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              comparisonColumns);
+          new PartialUpsertHandler(schema, comparisonColumns, upsertConfig);

Review Comment:
   (format) Please auto-reformat all the changes with [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -32,76 +31,79 @@
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
-    for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
-        } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
+    if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) {
+      // iterate over all columns in prevRecord and update newRecord with merged values
+      for (String column : prevRecord.getColumnNames()) {
+        // no merger to apply on primary key columns
+        if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
+          continue;
+        }
+
+        // use merged column value from result map
+        if (reuseMergerResult.containsKey(column)) {
+          Object mergedValue = reuseMergerResult.get(column);
+          setMergedValue(newRecord, column, mergedValue);
+        }
+      }
+    } else {
+      // iterate over only merger results and update newRecord with merged values
+      for (Map.Entry<String, Object> entry : reuseMergerResult.entrySet()) {
+        // skip if primary key column
+        String column = entry.getKey();
+        if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
+          continue;
         }
+
+        Object mergedValue = entry.getValue();
+        setMergedValue(newRecord, column, mergedValue);
+      }
+    }
+
+    // handle comparison columns
+    for (String column: _comparisonColumns) {
+      if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+        newRecord.putValue(column, prevRecord.getValue(column));
+        newRecord.removeNullValueField(column);
       }
     }
   }
+
+  private void setMergedValue(GenericRow newRecord, String column, Object mergedValue) {
+    if (mergedValue != null) {
+      // remove null value field if it was set
+      newRecord.removeNullValueField(column);
+      newRecord.putValue(column, mergedValue);
+    } else {
+      // if column exists but mapped to a null value then merger result was a null value
+      newRecord.addNullValueField(column);
+      newRecord.putValue(column, null);

Review Comment:
   (MAJOR) This won't work. In order to set a value to null, you'll need to call `putDefaultNullValue()` with a default null value. Do you see a scenario where you want to explicitly set a value to `null`?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -32,76 +31,79 @@
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
-    for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
-        } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
+    if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) {

Review Comment:
   Do we need to differentiate `PartialUpsertColumnarMerger` and custom merger? The logic should be the same?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
     PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
-      Preconditions.checkArgument(partialUpsertStrategies != null,
+      String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,

Review Comment:
   (minor) These checks can be pushed down to the constructor of `PartialUpsertHandler`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
     PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
-      Preconditions.checkArgument(partialUpsertStrategies != null,
+      String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
       partialUpsertHandler =
-          new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              comparisonColumns);
+          new PartialUpsertHandler(schema, comparisonColumns, upsertConfig);

Review Comment:
   (format) Please auto reformat all the changes with [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide)



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java:
##########
@@ -18,39 +18,50 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 
 
 public class PartialUpsertMergerFactory {
+
   private PartialUpsertMergerFactory() {
   }
 
-  private static final AppendMerger APPEND_MERGER = new AppendMerger();
-  private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
-  private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger();
-  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
-  private static final MaxMerger MAX_MERGER = new MaxMerger();
-  private static final MinMerger MIN_MERGER = new MinMerger();
-  private static final UnionMerger UNION_MERGER = new UnionMerger();
-
-  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
-    switch (strategy) {
-      case APPEND:
-        return APPEND_MERGER;
-      case INCREMENT:
-        return INCREMENT_MERGER;
-      case IGNORE:
-        return IGNORE_MERGER;
-      case MAX:
-        return MAX_MERGER;
-      case MIN:
-        return MIN_MERGER;
-      case OVERWRITE:
-        return OVERWRITE_MERGER;
-      case UNION:
-        return UNION_MERGER;
-      default:
-        throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
+  /**
+   * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in
+   * config
+   * @param primaryKeyColumns
+   * @param comparisonColumns
+   * @param upsertConfig
+   * @return
+   */
+  public static PartialUpsertMerger getPartialUpsertMerger(List<String> primaryKeyColumns,
+      List<String> comparisonColumns, UpsertConfig upsertConfig) {
+    PartialUpsertMerger partialUpsertMerger;
+    String customRowMerger = upsertConfig.getPartialUpsertMergerClass();
+    // If a custom implementation is provided in config, initialize an implementation and return.
+    if (StringUtils.isNotBlank(customRowMerger)) {
+      try {
+        Class<?> partialUpsertMergerClass = Class.forName(customRowMerger);
+        if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) {

Review Comment:
   Why does it have to extend `BasePartialUpsertMerger`? Implementing `PartialUpsertMerger` should be good right?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java:
##########
@@ -18,39 +18,50 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 
 
 public class PartialUpsertMergerFactory {
+
   private PartialUpsertMergerFactory() {
   }
 
-  private static final AppendMerger APPEND_MERGER = new AppendMerger();
-  private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
-  private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger();
-  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
-  private static final MaxMerger MAX_MERGER = new MaxMerger();
-  private static final MinMerger MIN_MERGER = new MinMerger();
-  private static final UnionMerger UNION_MERGER = new UnionMerger();
-
-  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
-    switch (strategy) {
-      case APPEND:
-        return APPEND_MERGER;
-      case INCREMENT:
-        return INCREMENT_MERGER;
-      case IGNORE:
-        return IGNORE_MERGER;
-      case MAX:
-        return MAX_MERGER;
-      case MIN:
-        return MIN_MERGER;
-      case OVERWRITE:
-        return OVERWRITE_MERGER;
-      case UNION:
-        return UNION_MERGER;
-      default:
-        throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
+  /**
+   * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in
+   * config
+   * @param primaryKeyColumns
+   * @param comparisonColumns
+   * @param upsertConfig
+   * @return
+   */
+  public static PartialUpsertMerger getPartialUpsertMerger(List<String> primaryKeyColumns,
+      List<String> comparisonColumns, UpsertConfig upsertConfig) {
+    PartialUpsertMerger partialUpsertMerger;
+    String customRowMerger = upsertConfig.getPartialUpsertMergerClass();
+    // If a custom implementation is provided in config, initialize an implementation and return.
+    if (StringUtils.isNotBlank(customRowMerger)) {
+      try {
+        Class<?> partialUpsertMergerClass = Class.forName(customRowMerger);
+        if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) {
+          throw new RuntimeException(
+              "Provided partialUpsertMergerClass is not an implementation of BasePartialUpsertMerger.class");
+        }
+        partialUpsertMerger =
+            (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class)
+                .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig);
+      } catch (ClassNotFoundException
+               | NoSuchMethodException | InstantiationException | IllegalAccessException
+               | InvocationTargetException e) {

Review Comment:
   (minor) These can be merged as `Exception e`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1551701865


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
     PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
-      Preconditions.checkArgument(partialUpsertStrategies != null,
+      String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,

Review Comment:
   If the checks are moved down to `PartialUpsertHandler`, the create table API response is success but the table goes in error state. This check would pro-actively fail table creation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on PR #11983:
URL: https://github.com/apache/pinot/pull/11983#issuecomment-1986978948

   The integration test failure is in delete table flow which is unrelated to the proposed changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1528985173


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java:
##########
@@ -918,32 +918,43 @@ static void validatePartialUpsertStrategies(TableConfig tableConfig, Schema sche
     UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
     assert upsertConfig != null;
     Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
+    String partialUpsertMergerClass = upsertConfig.getPartialUpsertMergerClass();
+

Review Comment:
   @Jackie-Jiang , @deemoliu , we may want to disallow defualtMergerStrategy to be defined when using partialUpsertMergerClass. This will require us to modify UpsertConfig.class default initialization: `private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE;` And handle this initilization in `POST /tableConfigs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang merged PR #11983:
URL: https://github.com/apache/pinot/pull/11983


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1551615323


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -32,76 +31,79 @@
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
-    for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
-        } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
+    if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) {
+      // iterate over all columns in prevRecord and update newRecord with merged values
+      for (String column : prevRecord.getColumnNames()) {
+        // no merger to apply on primary key columns
+        if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
+          continue;
+        }
+
+        // use merged column value from result map
+        if (reuseMergerResult.containsKey(column)) {
+          Object mergedValue = reuseMergerResult.get(column);
+          setMergedValue(newRecord, column, mergedValue);
+        }
+      }
+    } else {
+      // iterate over only merger results and update newRecord with merged values
+      for (Map.Entry<String, Object> entry : reuseMergerResult.entrySet()) {
+        // skip if primary key column
+        String column = entry.getKey();
+        if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) {
+          continue;
         }
+
+        Object mergedValue = entry.getValue();
+        setMergedValue(newRecord, column, mergedValue);
+      }
+    }
+
+    // handle comparison columns
+    for (String column: _comparisonColumns) {
+      if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+        newRecord.putValue(column, prevRecord.getValue(column));
+        newRecord.removeNullValueField(column);
       }
     }
   }
+
+  private void setMergedValue(GenericRow newRecord, String column, Object mergedValue) {
+    if (mergedValue != null) {
+      // remove null value field if it was set
+      newRecord.removeNullValueField(column);
+      newRecord.putValue(column, mergedValue);
+    } else {
+      // if column exists but mapped to a null value then merger result was a null value
+      newRecord.addNullValueField(column);
+      newRecord.putValue(column, null);

Review Comment:
   I might not be fully aware of the null value support feature, let me update it with `putDefaultNullValue()`. Thank you. I don't forsee a scenario for null value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1552327323


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -32,76 +31,79 @@
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
-    for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
-        } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
+    if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) {

Review Comment:
   I wanted to keep the logic for `PartialUpsertColumnarMerger`(behaviour so far) to be unmodified and was doing null handling wrongly for custom merger. I have removed the differentiation and using `putDefaultNullValue()` for null handling based on another [review comment](https://github.com/apache/pinot/pull/11983#discussion_r1550951066). We will never have a null merger result from `PartialUpsertColumnarMerger`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1552936971


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
     PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
-      Preconditions.checkArgument(partialUpsertStrategies != null,
+      String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,

Review Comment:
   Got it, thank you Jackie.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1517352187


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -307,7 +309,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
             int currentDocId = recordLocation.getDocId();
             if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
               _reusePreviousRow.init(currentSegment, currentDocId);
-              _partialUpsertHandler.merge(_reusePreviousRow, record);
+              _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult);

Review Comment:
   The`_reuseMergerResult` map is defined in the context of a partition for a consuming segment. If we move it to `PartialUpsertHandler` then we need to modify `PartialUpsertHandler` to be initialised in `BasePartitionUpsertMetadataManager` instead of `BaseTableUpsertMetadataManager`. Moreover the `_reuseMergerResult` would have to be made threadsafe across consuming segments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1401365070


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
       Preconditions.checkArgument(partialUpsertStrategies != null,
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
       _partialUpsertHandler =
-          new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              _comparisonColumns);
+          new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns,

Review Comment:
   can we get _comparisonColumns from upsertConfig?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -34,72 +33,85 @@
  * Handler for partial-upsert.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final String _timeColumnName;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, String timeColumnName, List<String> comparisonColumns,
+      UpsertConfig upsertConfig) {
+    _timeColumnName = timeColumnName;
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
+  }
+
+  @VisibleForTesting
+  public PartialUpsertHandler(Schema schema, String timeColumnName, List<String> comparisonColumns,
+      UpsertConfig upsertConfig,
+      PartialUpsertMerger partialUpsertMerger) {
+    _timeColumnName = timeColumnName;
+    _comparisonColumns = comparisonColumns;
+    _primaryKeyColumns = schema.getPrimaryKeyColumns();
+
+    _partialUpsertMerger = partialUpsertMerger;
   }
 
   /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
+   * Handles partial upsert of a new row.
+   * <p>
+   * A table can be configured to apply merge logic per column to merge previous column value and new column value
+   * based on strategies in table config. Or, one can define a custom merger by implementing
+   * {@link org.apache.pinot.segment.local.upsert.merger.BasePartialUpsertMerger}. The custom merger can apply merge
+   * logic between previous and new row at once. In table config specify the custom implementation class name:
+   * UpsertConfig.rowMergerCustomImplementation.
+   * <p>
+   * PartialUpsertHandler will update column values for new row which are present in the reuseMergerResult map. Any
+   * column name missing in the map will not be modified. Primary key, time and comparison columns will be unmodified.
    *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
+   * @param prevRecord
+   * @param newRecord
+   * @param reuseMergerResult
    */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
+
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // no merger to apply on time column
+      // since time column can be comparison column, do this check after comparison column check
+      if (column.equals(_timeColumnName)) {

Review Comment:
   is this behavior different with existing behavior? 
   the existing time column is not excluded from column level merging logic if we set another comparison column that is not time column



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Custom merger [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on PR #11983:
URL: https://github.com/apache/pinot/pull/11983#issuecomment-1809425452

   Please add some description to the PR, including what support is added, how to configure it etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1411467937


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -34,72 +33,85 @@
  * Handler for partial-upsert.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final String _timeColumnName;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, String timeColumnName, List<String> comparisonColumns,
+      UpsertConfig upsertConfig) {
+    _timeColumnName = timeColumnName;
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
+  }
+
+  @VisibleForTesting
+  public PartialUpsertHandler(Schema schema, String timeColumnName, List<String> comparisonColumns,
+      UpsertConfig upsertConfig,
+      PartialUpsertMerger partialUpsertMerger) {
+    _timeColumnName = timeColumnName;
+    _comparisonColumns = comparisonColumns;
+    _primaryKeyColumns = schema.getPrimaryKeyColumns();
+
+    _partialUpsertMerger = partialUpsertMerger;
   }
 
   /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
+   * Handles partial upsert of a new row.
+   * <p>
+   * A table can be configured to apply merge logic per column to merge previous column value and new column value
+   * based on strategies in table config. Or, one can define a custom merger by implementing
+   * {@link org.apache.pinot.segment.local.upsert.merger.BasePartialUpsertMerger}. The custom merger can apply merge
+   * logic between previous and new row at once. In table config specify the custom implementation class name:
+   * UpsertConfig.rowMergerCustomImplementation.
+   * <p>
+   * PartialUpsertHandler will update column values for new row which are present in the reuseMergerResult map. Any
+   * column name missing in the map will not be modified. Primary key, time and comparison columns will be unmodified.
    *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
+   * @param prevRecord
+   * @param newRecord
+   * @param reuseMergerResult
    */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
+
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // no merger to apply on time column
+      // since time column can be comparison column, do this check after comparison column check
+      if (column.equals(_timeColumnName)) {

Review Comment:
   +1. The logic should only rely on comparison column



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
       Preconditions.checkArgument(partialUpsertStrategies != null,
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
       _partialUpsertHandler =
-          new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              _comparisonColumns);
+          new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns,

Review Comment:
   We should not pass in time column. We can pass in `_comparisonColumns` and `upsertConfig`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -307,7 +309,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
             int currentDocId = recordLocation.getDocId();
             if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
               _reusePreviousRow.init(currentSegment, currentDocId);
-              _partialUpsertHandler.merge(_reusePreviousRow, record);
+              _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult);

Review Comment:
   I feel this reusable map should be maintained within the `PartialUpsertHandler`. I don't see why it should be part of the interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] Custom merger [pinot]

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11983:
URL: https://github.com/apache/pinot/pull/11983#issuecomment-1804213057

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11983](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (099a670) into [master](https://app.codecov.io/gh/apache/pinot/commit/b5e982333daf7ad2f59fe9c534d8413c0f33ff58?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b5e9823) will **decrease** coverage by `0.07%`.
   > Report is 13 commits behind head on master.
   > The diff coverage is `86.31%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #11983      +/-   ##
   ============================================
   - Coverage     61.43%   61.37%   -0.07%     
   - Complexity      207     1128     +921     
   ============================================
     Files          2385     2388       +3     
     Lines        129149   129248      +99     
     Branches      19994    20010      +16     
   ============================================
   - Hits          79342    79323      -19     
   - Misses        44055    44168     +113     
   - Partials       5752     5757       +5     
   ```
   
   | [Flag](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [custom-integration1](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration1](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [integration2](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [java-11](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.58% <1.05%> (-14.82%)` | :arrow_down: |
   | [java-21](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.61% <85.26%> (-33.66%)` | :arrow_down: |
   | [skip-bytebuffers-false](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `61.37% <86.31%> (-0.03%)` | :arrow_down: |
   | [skip-bytebuffers-true](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | |
   | [temurin](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `61.37% <86.31%> (-0.07%)` | :arrow_down: |
   | [unittests](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `61.37% <86.31%> (-0.06%)` | :arrow_down: |
   | [unittests1](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `46.58% <1.05%> (-0.11%)` | :arrow_down: |
   | [unittests2](https://app.codecov.io/gh/apache/pinot/pull/11983/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `27.61% <85.26%> (+<0.01%)` | :arrow_up: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Files](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...t/local/upsert/BaseTableUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQmFzZVRhYmxlVXBzZXJ0TWV0YWRhdGFNYW5hZ2VyLmphdmE=) | `77.45% <ø> (-0.22%)` | :arrow_down: |
   | [...t/ConcurrentMapPartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQ29uY3VycmVudE1hcFBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `85.51% <100.00%> (+0.10%)` | :arrow_up: |
   | [...t/local/upsert/merger/BasePartialUpsertMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL0Jhc2VQYXJ0aWFsVXBzZXJ0TWVyZ2VyLmphdmE=) | `100.00% <100.00%> (ø)` | |
   | [...cal/upsert/merger/PartialUpsertColumnarMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL1BhcnRpYWxVcHNlcnRDb2x1bW5hck1lcmdlci5qYXZh) | `100.00% <100.00%> (ø)` | |
   | [...ent/local/upsert/merger/columnar/AppendMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL2NvbHVtbmFyL0FwcGVuZE1lcmdlci5qYXZh) | `100.00% <ø> (ø)` | |
   | [...ent/local/upsert/merger/columnar/IgnoreMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL2NvbHVtbmFyL0lnbm9yZU1lcmdlci5qYXZh) | `100.00% <ø> (ø)` | |
   | [.../local/upsert/merger/columnar/IncrementMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL2NvbHVtbmFyL0luY3JlbWVudE1lcmdlci5qYXZh) | `40.00% <ø> (ø)` | |
   | [...egment/local/upsert/merger/columnar/MaxMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL2NvbHVtbmFyL01heE1lcmdlci5qYXZh) | `100.00% <ø> (ø)` | |
   | [...egment/local/upsert/merger/columnar/MinMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL2NvbHVtbmFyL01pbk1lcmdlci5qYXZh) | `100.00% <ø> (ø)` | |
   | [.../local/upsert/merger/columnar/OverwriteMerger.java](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvbWVyZ2VyL2NvbHVtbmFyL092ZXJ3cml0ZU1lcmdlci5qYXZh) | `100.00% <ø> (ø)` | |
   | ... and [5 more](https://app.codecov.io/gh/apache/pinot/pull/11983?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | |
   
   ... and [49 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11983/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: Codecov offers a browser extension for seamless coverage viewing on GitHub. Try it in [Chrome](https://chrome.google.com/webstore/detail/codecov/gedikamndpbemklijjkncpnolildpbgo) or [Firefox](https://addons.mozilla.org/en-US/firefox/addon/codecov/) today!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on PR #11983:
URL: https://github.com/apache/pinot/pull/11983#issuecomment-1822200867

   > Please add some description to the PR, including what support is added, how to configure it etc.
   
   @Jackie-Jiang, please review, added the necessary details.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1524914532


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -18,89 +18,82 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
-import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger;
+import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final PartialUpsertColumnMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
+    _defaultPartialUpsertMerger =
+        PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy());
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // use merged column value from result map
+      if (reuseMergerResult.containsKey(column)) {
+        Object mergedValue = reuseMergerResult.get(column);
+        if (mergedValue != null) {
+          // remove null value field if it was set
+          newRecord.removeNullValueField(column);
+          newRecord.putValue(column, mergedValue);
         } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+          // if column exists but mapped to a null value then merger result was a null value
+          newRecord.addNullValueField(column);
+          newRecord.putValue(column, null);
         }
+      } else if (!(_partialUpsertMerger instanceof PartialUpsertColumnarMerger)) {
+        // PartialUpsertColumnMerger already handles default merger but for any custom implementations
+        // non merged columns need to be applied with default merger
+        newRecord.putValue(column,
+            _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column)));

Review Comment:
   Good point, this was done with intention to keep the logic in the merger class minimal. But the optmization on map iteration is a better option. Let me update this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1520578347


##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -51,6 +51,9 @@ public enum Strategy {
   @JsonPropertyDescription("default upsert strategy for partial mode")
   private Strategy _defaultPartialUpsertStrategy = Strategy.OVERWRITE;
 
+  @JsonPropertyDescription("Class name for custom row merger implementation")
+  private String _rowMergerCustomImplementation;

Review Comment:
   Suggest renaming it to `partialUpsertMergerClass`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMerger.java:
##########
@@ -18,13 +18,28 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+import java.util.Map;
+import org.apache.pinot.segment.local.segment.readers.LazyRow;
+import org.apache.pinot.spi.data.readers.GenericRow;
+
+
+/**
+ * Merger previously persisted row with the new incoming row.
+ * <p>
+ * Implement this interface to define logic to merge rows. {@link LazyRow} provides abstraction row like abstraction
+ * to read previously persisted row by lazily loading column values if needed. For automatic plugging of the
+ * interface via {@link org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory}
+ * implement {@link BasePartialUpsertMerger}
+ */
 public interface PartialUpsertMerger {
+
   /**
-   * Handle partial upsert merge.
-   *
-   * @param previousValue the value of given field from the last derived full record during ingestion.
-   * @param currentValue the value of given field from the new consumed record.
-   * @return a new value after merge
+   * Merge previous row with new incoming row and persist the merged results per column in the provided
+   * mergerResult map. {@link org.apache.pinot.segment.local.upsert.PartialUpsertHandler} ensures the primary key and
+   * comparison columns are not modified, comparison columns are merged and only the latest non values are stored.
+   * @param prevRecord
+   * @param newRecord
+   * @param mergerResult
    */
-  Object merge(Object previousValue, Object currentValue);
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> mergerResult);

Review Comment:
   (minot) Remove `public`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -18,89 +18,82 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
-import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger;
+import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final PartialUpsertColumnMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
+    _defaultPartialUpsertMerger =
+        PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy());
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // use merged column value from result map
+      if (reuseMergerResult.containsKey(column)) {
+        Object mergedValue = reuseMergerResult.get(column);
+        if (mergedValue != null) {
+          // remove null value field if it was set
+          newRecord.removeNullValueField(column);
+          newRecord.putValue(column, mergedValue);
         } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+          // if column exists but mapped to a null value then merger result was a null value
+          newRecord.addNullValueField(column);
+          newRecord.putValue(column, null);
         }
+      } else if (!(_partialUpsertMerger instanceof PartialUpsertColumnarMerger)) {
+        // PartialUpsertColumnMerger already handles default merger but for any custom implementations
+        // non merged columns need to be applied with default merger
+        newRecord.putValue(column,
+            _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column)));

Review Comment:
   The default strategy is columnar based. Do we want to apply it for other mergers? I feel it is more intuitive if all the merge logic is handled within the merger



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -18,89 +18,82 @@
  */
 package org.apache.pinot.segment.local.upsert;
 
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.segment.readers.LazyRow;
-import org.apache.pinot.segment.local.upsert.merger.OverwriteMerger;
+import org.apache.pinot.segment.local.upsert.merger.PartialUpsertColumnarMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
+import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMerger;
+import org.apache.pinot.segment.local.upsert.merger.columnar.PartialUpsertColumnMergerFactory;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
 
 
 /**
  * Handler for partial-upsert.
+ *
+ * This class is responsible for merging the new record with the previous record.
+ * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column,
+ * it uses the default merge strategy.
+ *
+ * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}.
+ * If a merger for row is defined then it takes precedence and ignores column mergers.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final PartialUpsertColumnMerger _defaultPartialUpsertMerger;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) {
+    _defaultPartialUpsertMerger =
+        PartialUpsertColumnMergerFactory.getMerger(upsertConfig.getDefaultPartialUpsertStrategy());
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
   }
 
-  /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
-   *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
-   */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // use merged column value from result map
+      if (reuseMergerResult.containsKey(column)) {
+        Object mergedValue = reuseMergerResult.get(column);
+        if (mergedValue != null) {
+          // remove null value field if it was set
+          newRecord.removeNullValueField(column);
+          newRecord.putValue(column, mergedValue);
         } else {
-          // Overwrite mergers.
-          // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value
-          // (2) Otherwise, if previous is not null, init columnReader and use the previous value.
-          if (newRecord.isNullValue(column)) {
-            Object prevValue = prevRecord.getValue(column);
-            if (prevValue != null) {
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            }
-          }
+          // if column exists but mapped to a null value then merger result was a null value
+          newRecord.addNullValueField(column);
+          newRecord.putValue(column, null);
         }
+      } else if (!(_partialUpsertMerger instanceof PartialUpsertColumnarMerger)) {
+        // PartialUpsertColumnMerger already handles default merger but for any custom implementations
+        // non merged columns need to be applied with default merger
+        newRecord.putValue(column,
+            _defaultPartialUpsertMerger.merge(prevRecord.getValue(column), newRecord.getValue(column)));

Review Comment:
   If we do not handle default merge here, we may iterate over the merger result to reduce map lookups



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on PR #11983:
URL: https://github.com/apache/pinot/pull/11983#issuecomment-2030364352

   @Jackie-Jiang @deemoliu , please review the updated changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "rohityadav1993 (via GitHub)" <gi...@apache.org>.
rohityadav1993 commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1551631254


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java:
##########
@@ -18,39 +18,50 @@
  */
 package org.apache.pinot.segment.local.upsert.merger;
 
+import java.lang.reflect.InvocationTargetException;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.spi.config.table.UpsertConfig;
 
 
 public class PartialUpsertMergerFactory {
+
   private PartialUpsertMergerFactory() {
   }
 
-  private static final AppendMerger APPEND_MERGER = new AppendMerger();
-  private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger();
-  private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger();
-  private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger();
-  private static final MaxMerger MAX_MERGER = new MaxMerger();
-  private static final MinMerger MIN_MERGER = new MinMerger();
-  private static final UnionMerger UNION_MERGER = new UnionMerger();
-
-  public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) {
-    switch (strategy) {
-      case APPEND:
-        return APPEND_MERGER;
-      case INCREMENT:
-        return INCREMENT_MERGER;
-      case IGNORE:
-        return IGNORE_MERGER;
-      case MAX:
-        return MAX_MERGER;
-      case MIN:
-        return MIN_MERGER;
-      case OVERWRITE:
-        return OVERWRITE_MERGER;
-      case UNION:
-        return UNION_MERGER;
-      default:
-        throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy);
+  /**
+   * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in
+   * config
+   * @param primaryKeyColumns
+   * @param comparisonColumns
+   * @param upsertConfig
+   * @return
+   */
+  public static PartialUpsertMerger getPartialUpsertMerger(List<String> primaryKeyColumns,
+      List<String> comparisonColumns, UpsertConfig upsertConfig) {
+    PartialUpsertMerger partialUpsertMerger;
+    String customRowMerger = upsertConfig.getPartialUpsertMergerClass();
+    // If a custom implementation is provided in config, initialize an implementation and return.
+    if (StringUtils.isNotBlank(customRowMerger)) {
+      try {
+        Class<?> partialUpsertMergerClass = Class.forName(customRowMerger);
+        if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) {

Review Comment:
   `BasePartialUpsertMerger ` defines a constructor which is be used to initialize a custom merger class by overriding the constructor:
   `(PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class)
                   .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig);`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


Re: [PR] pluggable partial upsert merger [pinot]

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1552669242


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
     PartialUpsertHandler partialUpsertHandler = null;
     if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
       Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies();
-      Preconditions.checkArgument(partialUpsertStrategies != null,
+      String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass();
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null,

Review Comment:
   I've applied a commit to demonstrate the idea. The exception will be thrown from the constructor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org