You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/01 17:45:14 UTC
[pinot] branch master updated: Keeps nullness attributes of merged in comparison column values (#10704)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6dbd9e23fe Keeps nullness attributes of merged in comparison column values (#10704)
6dbd9e23fe is described below
commit 6dbd9e23fef8d326fa2f2467ae8a10e2b3fdb428
Author: Evan Galpin <eg...@users.noreply.github.com>
AuthorDate: Thu Jun 1 10:45:07 2023 -0700
Keeps nullness attributes of merged in comparison column values (#10704)
---
.../segment/local/upsert/ComparisonColumns.java | 41 ++++-
.../segment/local/upsert/PartialUpsertHandler.java | 7 +-
.../pinot/segment/local/upsert/UpsertUtils.java | 10 +-
.../mutable/MutableSegmentImplUpsertTest.java | 3 +
.../local/upsert/ComparisonColumnsTest.java | 171 +++++++++++++++++++++
5 files changed, 225 insertions(+), 7 deletions(-)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
index 5d40e5a350..174e441a20 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java
@@ -23,6 +23,7 @@ package org.apache.pinot.segment.local.upsert;
public class ComparisonColumns implements Comparable<ComparisonColumns> {
private final Comparable[] _values;
private final int _comparableIndex;
+ public static final int SEALED_SEGMENT_COMPARISON_INDEX = -1;
public ComparisonColumns(Comparable[] values, int comparableIndex) {
_values = values;
@@ -37,10 +38,46 @@ public class ComparisonColumns implements Comparable<ComparisonColumns> {
return _comparableIndex;
}
+ public int compareToSealed(ComparisonColumns other) {
+ /*
+ - iterate over all columns
+ - if any value in _values is greater than its counterpart in _other._values, keep _values as-is and return 1
+ - if any value in _values is less than its counterpart in _other._values, keep _values as-is and return -1
+ - if all values between the two sets of Comparables are equal (compareTo == 0), keep _values as-is and return 0
+ */
+ for (int i = 0; i < _values.length; i++) {
+ Comparable comparisonValue = _values[i];
+ Comparable otherComparisonValue = other.getValues()[i];
+ if (comparisonValue == null && otherComparisonValue == null) {
+ continue;
+ }
+
+ // Always keep the record with non-null value, or that with the greater comparisonResult
+ if (comparisonValue == null) {
+ // implies comparisonValue == null && otherComparisonValue != null
+ return -1;
+ } else if (otherComparisonValue == null) {
+ // implies comparisonValue != null && otherComparisonValue == null
+ return 1;
+ } else {
+ int comparisonResult = comparisonValue.compareTo(otherComparisonValue);
+ if (comparisonResult != 0) {
+ return comparisonResult;
+ }
+ }
+ }
+ return 0;
+ }
+
@Override
public int compareTo(ComparisonColumns other) {
- // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility.
- // There is no attempt to guarantee behavior in the case where there are multiple non-null values
+ if (_comparableIndex == SEALED_SEGMENT_COMPARISON_INDEX) {
+ return compareToSealed(other);
+ }
+
+ // _comparisonColumns should only at most one non-null comparison value for newly ingested data. If not, it is
+ // the user's responsibility. There is no attempt to guarantee behavior in the case where there are multiple
+ // non-null values
int comparisonResult;
Comparable comparisonValue = _values[_comparableIndex];
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 86c268e509..483ec6d6bb 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -74,7 +74,12 @@ public class PartialUpsertHandler {
// comparison column values from the previous record, and the sole non-null comparison column value from
// the new record.
newRecord.putValue(column, previousRecord.getValue(column));
- newRecord.removeNullValueField(column);
+ if (!_comparisonColumns.contains(column)) {
+ // Despite wanting to overwrite the values to comparison columns from prior records, we want to
+ // preserve for _this_ record which comparison column was non-null. Doing so will allow us to
+ // re-evaluate the same comparisons when reading a segment and during steady-state stream ingestion
+ newRecord.removeNullValueField(column);
+ }
} else if (!_comparisonColumns.contains(column)) {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
newRecord.putValue(column,
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index acda88a27b..6a7d3ed626 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -180,6 +180,7 @@ public class UpsertUtils {
public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) {
_comparisonColumnReaders = new PinotSegmentColumnReader[comparisonColumns.size()];
+
for (int i = 0; i < comparisonColumns.size(); i++) {
_comparisonColumnReaders[i] = new PinotSegmentColumnReader(segment, comparisonColumns.get(i));
}
@@ -190,13 +191,14 @@ public class UpsertUtils {
for (int i = 0; i < _comparisonColumnReaders.length; i++) {
PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
- Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
+ Comparable comparisonValue = null;
+ if (!columnReader.isNull(docId)) {
+ comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
+ }
comparisonColumns[i] = comparisonValue;
}
- // Note that the comparable index is negative here to indicate that this instance could be the argument to
- // ComparisonColumns#compareTo, but should never call compareTo itself.
- return new ComparisonColumns(comparisonColumns, -1);
+ return new ComparisonColumns(comparisonColumns, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
}
@Override
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index c8b38db61e..1138d891c5 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -76,6 +76,7 @@ public class MutableSegmentImplUpsertTest {
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
+ .setNullHandlingEnabled(true)
.build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
@@ -138,6 +139,7 @@ public class MutableSegmentImplUpsertTest {
// Confirm that both comparison column values have made it into the persisted upserted doc
Assert.assertEquals(1567205397L, _mutableSegmentImpl.getValue(2, "secondsSinceEpoch"));
Assert.assertEquals(1567205395L, _mutableSegmentImpl.getValue(2, "otherComparisonColumn"));
+ Assert.assertTrue(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(2));
// bb
Assert.assertFalse(bitmap.contains(4));
@@ -146,6 +148,7 @@ public class MutableSegmentImplUpsertTest {
// Confirm that comparison column values have made it into the persisted upserted doc
Assert.assertEquals(1567205396L, _mutableSegmentImpl.getValue(5, "secondsSinceEpoch"));
Assert.assertEquals(Long.MIN_VALUE, _mutableSegmentImpl.getValue(5, "otherComparisonColumn"));
+ Assert.assertTrue(_mutableSegmentImpl.getDataSource("otherComparisonColumn").getNullValueVector().isNull(5));
}
}
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java
new file mode 100644
index 0000000000..2fae498b44
--- /dev/null
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ComparisonColumnsTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+import java.util.Arrays;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class ComparisonColumnsTest {
+ private void nullFill(Comparable[]... comparables) {
+ for (Comparable[] comps : comparables) {
+ Arrays.fill(comps, null);
+ }
+ }
+
+ @Test
+ public void testRealtimeComparison() {
+ Comparable[] newComparables = new Comparable[3];
+ Comparable[] persistedComparables = new Comparable[3];
+ ComparisonColumns alreadyPersisted = new ComparisonColumns(persistedComparables, 0);
+ ComparisonColumns toBeIngested = new ComparisonColumns(newComparables, 0);
+
+ // reject same col with smaller value
+ newComparables[0] = 1;
+ persistedComparables[0] = 2;
+ int comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, -1);
+
+ // persist same col with equal value
+ nullFill(newComparables, persistedComparables);
+ newComparables[0] = 2;
+ persistedComparables[0] = 2;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 0);
+
+ // persist same col with larger value
+ nullFill(newComparables, persistedComparables);
+ newComparables[0] = 2;
+ persistedComparables[0] = 1;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+ Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, null});
+
+ // persist doc with col which was previously null, even though its value is smaller than the previous non-null col
+ nullFill(newComparables, persistedComparables);
+ toBeIngested = new ComparisonColumns(newComparables, newComparables.length - 1);
+ newComparables[newComparables.length - 1] = 1;
+ persistedComparables[0] = 2;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+ Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, 1});
+
+ // persist new doc where existing doc has multiple non-null comparison values
+ nullFill(newComparables, persistedComparables);
+ toBeIngested = new ComparisonColumns(newComparables, 1);
+ newComparables[1] = 2;
+ Arrays.fill(persistedComparables, 1);
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+ Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{1, 2, 1});
+
+ // reject new doc where existing doc has multiple non-null comparison values
+ nullFill(newComparables, persistedComparables);
+ newComparables[1] = 0;
+ Arrays.fill(persistedComparables, 1);
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, -1);
+ }
+
+ @Test
+ public void testSealedComparison() {
+ // Remember to be cognizant of which scenarios are _actually_ possible in a sealed segment. The way in which docs
+ // are compared during realtime ingestion dictates the possible scenarios of persisted rows. Ex. it is not
+ // possible for 2 docs with the same primary key to have a mutually exclusive set of non-null values; if such a
+ // scenario arose during realtime ingestion, the values would be merged such that the newly persisted doc would
+ // have all non-null comparison values. We should avoid making tests pass for scenarios that are not intended to
+ // be supported.
+ Comparable[] newComparables = new Comparable[3];
+ Comparable[] persistedComparables = new Comparable[3];
+ ComparisonColumns alreadyPersisted =
+ new ComparisonColumns(persistedComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
+ ComparisonColumns toBeIngested =
+ new ComparisonColumns(newComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
+
+ // reject same col with smaller value
+ newComparables[0] = 1;
+ persistedComparables[0] = 2;
+ int comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, -1);
+
+ // persist same col with equal value
+ nullFill(newComparables, persistedComparables);
+ newComparables[0] = 2;
+ persistedComparables[0] = 2;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 0);
+ // Verify unchanged comparables in the case of SEALED comparison
+ Assert.assertEquals(toBeIngested.getValues(), newComparables);
+
+ // persist same col with larger value
+ nullFill(newComparables, persistedComparables);
+ newComparables[0] = 2;
+ persistedComparables[0] = 1;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+
+ // reject doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has 2
+ // null columns. The presence of null columns in one of the docs implies that it must have come before the doc
+ // with non-null columns.
+ nullFill(newComparables, persistedComparables);
+ newComparables[1] = 1;
+ persistedComparables[0] = 1;
+ persistedComparables[2] = 1;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, -1);
+
+ // persist doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has
+ nullFill(newComparables, persistedComparables);
+ newComparables[0] = 1;
+ newComparables[2] = 2;
+ persistedComparables[0] = 1;
+ persistedComparables[2] = 1;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+
+ // persist doc with non-null value where existing doc had null value in same column previously (but multiple
+ // non-null in other columns)
+ nullFill(newComparables, persistedComparables);
+ newComparables[0] = 1;
+ newComparables[1] = 1;
+ newComparables[2] = 1;
+ persistedComparables[0] = 1;
+ persistedComparables[2] = 1;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+
+ // reject doc where existing doc has all non-null comparison values, but _this_ doc has 2 null values.
+ // The presence of null columns in one of the docs implies that it must have come before the doc with non-null
+ // columns.
+ nullFill(newComparables, persistedComparables);
+ newComparables[1] = 1;
+ Arrays.fill(persistedComparables, 1);
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, -1);
+
+ // Persist doc where existing doc has all non-null comparison values, but _this_ doc has a larger value.
+ nullFill(newComparables, persistedComparables);
+ Arrays.fill(newComparables, 1);
+ Arrays.fill(persistedComparables, 1);
+ newComparables[1] = 2;
+ comparisonResult = toBeIngested.compareTo(alreadyPersisted);
+ Assert.assertEquals(comparisonResult, 1);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org