You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "Jackie-Jiang (via GitHub)" <gi...@apache.org> on 2023/12/01 00:44:00 UTC

Re: [PR] pluggable partial upsert merger [pinot]

Jackie-Jiang commented on code in PR #11983:
URL: https://github.com/apache/pinot/pull/11983#discussion_r1411467937


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java:
##########
@@ -34,72 +33,85 @@
  * Handler for partial-upsert.
  */
 public class PartialUpsertHandler {
-  // _column2Mergers maintains the mapping of merge strategies per columns.
-  private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>();
-  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final String _timeColumnName;
   private final List<String> _comparisonColumns;
   private final List<String> _primaryKeyColumns;
+  private final PartialUpsertMerger _partialUpsertMerger;
 
-  public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies,
-      UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) {
-    _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+  public PartialUpsertHandler(Schema schema, String timeColumnName, List<String> comparisonColumns,
+      UpsertConfig upsertConfig) {
+    _timeColumnName = timeColumnName;
     _comparisonColumns = comparisonColumns;
     _primaryKeyColumns = schema.getPrimaryKeyColumns();
 
-    for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) {
-      _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue()));
-    }
+    _partialUpsertMerger =
+        PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig);
+  }
+
+  @VisibleForTesting
+  public PartialUpsertHandler(Schema schema, String timeColumnName, List<String> comparisonColumns,
+      UpsertConfig upsertConfig,
+      PartialUpsertMerger partialUpsertMerger) {
+    _timeColumnName = timeColumnName;
+    _comparisonColumns = comparisonColumns;
+    _primaryKeyColumns = schema.getPrimaryKeyColumns();
+
+    _partialUpsertMerger = partialUpsertMerger;
   }
 
   /**
-   * Merges records and returns the merged record.
-   * We used a map to indicate all configured fields for partial upsert. For these fields
-   * (1) If the prev value is null, return the new value
-   * (2) If the prev record is not null, the new value is null, return the prev value.
-   * (3) If neither values are not null, then merge the value and return.
-   * For un-configured fields, they are using default override behavior, regardless null values.
+   * Handles partial upsert of a new row.
+   * <p>
+   * A table can be configured to apply merge logic per column to merge previous column value and new column value
+   * based on strategies in table config. Or, one can define a custom merger by implementing
+   * {@link org.apache.pinot.segment.local.upsert.merger.BasePartialUpsertMerger}. The custom merger can apply merge
+   * logic between previous and new row at once. In table config specify the custom implementation class name:
+   * UpsertConfig.rowMergerCustomImplementation.
+   * <p>
+   * PartialUpsertHandler will update column values for new row which are present in the reuseMergerResult map. Any
+   * column name missing in the map will not be modified. Primary key, time and comparison columns will be unmodified.
    *
-   * For example, overwrite merger will only override the prev value if the new value is not null.
-   * Null values will override existing values if not configured. They can be ignored by using ignoreMerger.
-   *
-   * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for
-   *                   re-reads.
-   * @param newRecord the new consumed record.
+   * @param prevRecord
+   * @param newRecord
+   * @param reuseMergerResult
    */
-  public void merge(LazyRow prevRecord, GenericRow newRecord) {
+  public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) {
+    reuseMergerResult.clear();
+
+    // merger current row with previously indexed row
+    _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult);
+
     for (String column : prevRecord.getColumnNames()) {
-      if (!_primaryKeyColumns.contains(column)) {
-        PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
-        // Non-overwrite mergers
-        // (1) If the value of the previous is null value, skip merging and use the new value
-        // (2) Else If the value of new value is null, use the previous value (even for comparison columns).
-        // (3) Else If the column is not a comparison column, we applied the merged value to it.
-        if (!(merger instanceof OverwriteMerger)) {
-          Object prevValue = prevRecord.getValue(column);
-          if (prevValue != null) {
-            if (newRecord.isNullValue(column)) {
-              // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of
-              // using
-              // multiple comparison columns. We never apply a merge function to it, rather we just take any/all
-              // non-null comparison column values from the previous record, and the sole non-null comparison column
-              // value from the new record.
-              newRecord.putValue(column, prevValue);
-              newRecord.removeNullValueField(column);
-            } else if (!_comparisonColumns.contains(column)) {
-              newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column)));
-            }
-          }
+
+      // no merger to apply on primary key columns
+      if (_primaryKeyColumns.contains(column)) {
+        continue;
+      }
+      // no merger to apply on comparison key column, use previous row's value if current is null
+      if (_comparisonColumns.contains(column)) {
+        if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) {
+          newRecord.putValue(column, prevRecord.getValue(column));
+          newRecord.removeNullValueField(column);
+        }
+        continue;
+      }
+
+      // no merger to apply on time column
+      // since time column can be comparison column, do this check after comparison column check
+      if (column.equals(_timeColumnName)) {

Review Comment:
   +1. The logic should only rely on comparison column



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java:
##########
@@ -106,8 +106,8 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD
       Preconditions.checkArgument(partialUpsertStrategies != null,
           "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType);
       _partialUpsertHandler =
-          new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(),
-              _comparisonColumns);
+          new PartialUpsertHandler(schema, tableConfig.getValidationConfig().getTimeColumnName(), _comparisonColumns,

Review Comment:
   We should not pass in time column. We can pass in `_comparisonColumns` and `upsertConfig`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -307,7 +309,7 @@ protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
             int currentDocId = recordLocation.getDocId();
             if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
               _reusePreviousRow.init(currentSegment, currentDocId);
-              _partialUpsertHandler.merge(_reusePreviousRow, record);
+              _partialUpsertHandler.merge(_reusePreviousRow, record, _reuseMergerResult);

Review Comment:
   I feel this reusable map should be maintained within the `PartialUpsertHandler`. I don't see why it should be part of the interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org