You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/01 17:45:14 UTC

[pinot] branch master updated: Keeps nullness attributes of merged in comparison column values (#10704)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dbd9e23fe Keeps nullness attributes of merged in comparison column values (#10704)
6dbd9e23fe is described below

commit 6dbd9e23fef8d326fa2f2467ae8a10e2b3fdb428
Author: Evan Galpin <eg...@users.noreply.github.com>
AuthorDate: Thu Jun 1 10:45:07 2023 -0700

    Keeps nullness attributes of merged in comparison column values (#10704)
---
 .../segment/local/upsert/ComparisonColumns.java    |  41 ++++-
 .../segment/local/upsert/PartialUpsertHandler.java |   7 +-
 .../pinot/segment/local/upsert/UpsertUtils.java    |  10 +-
 .../mutable/MutableSegmentImplUpsertTest.java      |   3 +
 .../local/upsert/ComparisonColumnsTest.java        | 171 +++++++++++++++++++++
 5 files changed, 225 insertions(+), 7 deletions(-)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
index 5d40e5a350..174e441a20 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
@@ -23,6 +23,7 @@ package org.apache.pinot.segment.local.upsert;
 public class ComparisonColumns implements Comparable<ComparisonColumns> {
   private final Comparable[] _values;
   private final int _comparableIndex;
+  public static final int SEALED_SEGMENT_COMPARISON_INDEX = -1;
 
   public ComparisonColumns(Comparable[] values, int comparableIndex) {
     _values = values;
@@ -37,10 +38,46 @@ public class ComparisonColumns implements Comparable<ComparisonColumns> {
     return _comparableIndex;
   }
 
+  public int compareToSealed(ComparisonColumns other) {
+      /*
+       - iterate over all columns
+       - if any value in _values is greater than its counterpart in _other._values, keep _values as-is and return 1
+       - if any value in _values is less than its counterpart  in _other._values, keep _values as-is and return -1
+       - if all values between the two sets of Comparables are equal (compareTo == 0), keep _values as-is and return 0
+       */
+    for (int i = 0; i < _values.length; i++) {
+      Comparable comparisonValue = _values[i];
+      Comparable otherComparisonValue = other.getValues()[i];
+      if (comparisonValue == null && otherComparisonValue == null) {
+        continue;
+      }
+
+      // Always keep the record with non-null value, or that with the greater comparisonResult
+      if (comparisonValue == null) {
+        // implies comparisonValue == null && otherComparisonValue != null
+        return -1;
+      } else if (otherComparisonValue == null) {
+        // implies comparisonValue != null && otherComparisonValue == null
+        return 1;
+      } else {
+        int comparisonResult = comparisonValue.compareTo(otherComparisonValue);
+        if (comparisonResult != 0) {
+          return comparisonResult;
+        }
+      }
+    }
+    return 0;
+  }
+
   @Override
   public int compareTo(ComparisonColumns other) {
-    // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility.
-    // There is no attempt to guarantee behavior in the case where there are multiple non-null values
+    if (_comparableIndex == SEALED_SEGMENT_COMPARISON_INDEX) {
+      return compareToSealed(other);
+    }
+
+    // _comparisonColumns should only at most one non-null comparison value for newly ingested data. If not, it is
+    // the user's responsibility. There is no attempt to guarantee behavior in the case where there are multiple
+    // non-null values
     int comparisonResult;
 
     Comparable comparisonValue = _values[_comparableIndex];
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 86c268e509..483ec6d6bb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -74,7 +74,12 @@ public class PartialUpsertHandler {
             // comparison column values from the previous record, and the sole non-null comparison column value from
             // the new record.
             newRecord.putValue(column, previousRecord.getValue(column));
-            newRecord.removeNullValueField(column);
+            if (!_comparisonColumns.contains(column)) {
+              // Despite wanting to overwrite the values to comparison columns from prior records, we want to
+              // preserve for _this_ record which comparison column was non-null. Doing so will allow us to
+              // re-evaluate the same comparisons when reading a segment and during steady-state stream ingestion
+              newRecord.removeNullValueField(column);
+            }
           } else if (!_comparisonColumns.contains(column)) {
             PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
             newRecord.putValue(column,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index acda88a27b..6a7d3ed626 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -180,6 +180,7 @@ public class UpsertUtils {
 
     public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) {
       _comparisonColumnReaders = new PinotSegmentColumnReader[comparisonColumns.size()];
+
       for (int i = 0; i < comparisonColumns.size(); i++) {
         _comparisonColumnReaders[i] = new PinotSegmentColumnReader(segment, comparisonColumns.get(i));
       }
@@ -190,13 +191,14 @@ public class UpsertUtils {
 
       for (int i = 0; i < _comparisonColumnReaders.length; i++) {
         PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
-        Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
+        Comparable comparisonValue = null;
+        if (!columnReader.isNull(docId)) {
+          comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
+        }
         comparisonColumns[i] = comparisonValue;
       }
 
-      // Note that the comparable index is negative here to indicate that this instance could be the argument to
-      // ComparisonColumns#compareTo, but should never call compareTo itself.
-      return new ComparisonColumns(comparisonColumns, -1);
+      return new ComparisonColumns(comparisonColumns, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
     }
 
     @Override
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index c8b38db61e..1138d891c5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -76,6 +76,7 @@ public class MutableSegmentImplUpsertTest {
     _schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
     _tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
+            .setNullHandlingEnabled(true)
             .build();
     _recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
     File jsonFile = new File(dataResourceUrl.getFile());
@@ -138,6 +139,7 @@ public class MutableSegmentImplUpsertTest {
       // Confirm that both comparison column values have made it into the persisted upserted doc
       Assert.assertEquals(1567205397L, _mutableSegmentImpl.getValue(2, "secondsSinceEpoch"));
       Assert.assertEquals(1567205395L, _mutableSegmentImpl.getValue(2, "otherComparisonColumn"));
+      Assert.assertTrue(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(2));
 
       // bb
       Assert.assertFalse(bitmap.contains(4));
@@ -146,6 +148,7 @@ public class MutableSegmentImplUpsertTest {
       // Confirm that comparison column values have made it into the persisted upserted doc
       Assert.assertEquals(1567205396L, _mutableSegmentImpl.getValue(5, "secondsSinceEpoch"));
       Assert.assertEquals(Long.MIN_VALUE, _mutableSegmentImpl.getValue(5, "otherComparisonColumn"));
+      Assert.assertTrue(_mutableSegmentImpl.getDataSource("otherComparisonColumn").getNullValueVector().isNull(5));
     }
   }
 }
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java
new file mode 100644
index 0000000000..2fae498b44
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.Arrays;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class ComparisonColumnsTest {
+  private void nullFill(Comparable[]... comparables) {
+    for (Comparable[] comps : comparables) {
+      Arrays.fill(comps, null);
+    }
+  }
+
+  @Test
+  public void testRealtimeComparison() {
+    Comparable[] newComparables = new Comparable[3];
+    Comparable[] persistedComparables = new Comparable[3];
+    ComparisonColumns alreadyPersisted = new ComparisonColumns(persistedComparables, 0);
+    ComparisonColumns toBeIngested = new ComparisonColumns(newComparables, 0);
+
+    // reject same col with smaller value
+    newComparables[0] = 1;
+    persistedComparables[0] = 2;
+    int comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, -1);
+
+    // persist same col with equal value
+    nullFill(newComparables, persistedComparables);
+    newComparables[0] = 2;
+    persistedComparables[0] = 2;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 0);
+
+    // persist same col with larger value
+    nullFill(newComparables, persistedComparables);
+    newComparables[0] = 2;
+    persistedComparables[0] = 1;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+    Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, null});
+
+    // persist doc with col which was previously null, even though its value is smaller than the previous non-null col
+    nullFill(newComparables, persistedComparables);
+    toBeIngested = new ComparisonColumns(newComparables, newComparables.length - 1);
+    newComparables[newComparables.length - 1] = 1;
+    persistedComparables[0] = 2;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+    Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, 1});
+
+    // persist new doc where existing doc has multiple non-null comparison values
+    nullFill(newComparables, persistedComparables);
+    toBeIngested = new ComparisonColumns(newComparables, 1);
+    newComparables[1] = 2;
+    Arrays.fill(persistedComparables, 1);
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+    Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{1, 2, 1});
+
+    // reject new doc where existing doc has multiple non-null comparison values
+    nullFill(newComparables, persistedComparables);
+    newComparables[1] = 0;
+    Arrays.fill(persistedComparables, 1);
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, -1);
+  }
+
+  @Test
+  public void testSealedComparison() {
+    // Remember to be cognizant of which scenarios are _actually_ possible in a sealed segment. The way in which docs
+    // are compared during realtime ingestion dictates the possible scenarios of persisted rows. Ex. it is not
+    // possible for 2 docs with the same primary key to have a mutually exclusive set of non-null values; if such a
+    // scenario arose during realtime ingestion, the values would be merged such that the newly persisted doc would
+    // have all non-null comparison values. We should avoid making tests pass for scenarios that are not intended to
+    // be supported.
+    Comparable[] newComparables = new Comparable[3];
+    Comparable[] persistedComparables = new Comparable[3];
+    ComparisonColumns alreadyPersisted =
+        new ComparisonColumns(persistedComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
+    ComparisonColumns toBeIngested =
+        new ComparisonColumns(newComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
+
+    // reject same col with smaller value
+    newComparables[0] = 1;
+    persistedComparables[0] = 2;
+    int comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, -1);
+
+    // persist same col with equal value
+    nullFill(newComparables, persistedComparables);
+    newComparables[0] = 2;
+    persistedComparables[0] = 2;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 0);
+    // Verify unchanged comparables in the case of SEALED comparison
+    Assert.assertEquals(toBeIngested.getValues(), newComparables);
+
+    // persist same col with larger value
+    nullFill(newComparables, persistedComparables);
+    newComparables[0] = 2;
+    persistedComparables[0] = 1;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+
+    // reject doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has 2
+    // null columns. The presence of null columns in one of the docs implies that it must have come before the doc
+    // with non-null columns.
+    nullFill(newComparables, persistedComparables);
+    newComparables[1] = 1;
+    persistedComparables[0] = 1;
+    persistedComparables[2] = 1;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, -1);
+
+    // persist doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has
+    nullFill(newComparables, persistedComparables);
+    newComparables[0] = 1;
+    newComparables[2] = 2;
+    persistedComparables[0] = 1;
+    persistedComparables[2] = 1;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+
+    // persist doc with non-null value where existing doc had null value in same column previously (but multiple
+    // non-null in other columns)
+    nullFill(newComparables, persistedComparables);
+    newComparables[0] = 1;
+    newComparables[1] = 1;
+    newComparables[2] = 1;
+    persistedComparables[0] = 1;
+    persistedComparables[2] = 1;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+
+    // reject doc where existing doc has all non-null comparison values, but _this_ doc has 2 null values.
+    // The presence of null columns in one of the docs implies that it must have come before the doc with non-null
+    // columns.
+    nullFill(newComparables, persistedComparables);
+    newComparables[1] = 1;
+    Arrays.fill(persistedComparables, 1);
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, -1);
+
+    // Persist doc where existing doc has all non-null comparison values, but _this_ doc has a larger value.
+    nullFill(newComparables, persistedComparables);
+    Arrays.fill(newComparables, 1);
+    Arrays.fill(persistedComparables, 1);
+    newComparables[1] = 2;
+    comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+    Assert.assertEquals(comparisonResult, 1);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org