You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/23 18:59:49 UTC
[pinot] branch master updated: Add delete support to upsert tables (#10703)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 608cb8de2b Add delete support to upsert tables (#10703)
608cb8de2b is described below
commit 608cb8de2bd19c8497b03be07a087591bfd1c3bd
Author: Navina Ramesh <na...@apache.org>
AuthorDate: Fri Jun 23 11:59:43 2023 -0700
Add delete support to upsert tables (#10703)
---
.../realtime/LLRealtimeSegmentDataManager.java | 1 +
.../org/apache/pinot/core/plan/FilterPlanNode.java | 34 +-
...adataAndDictionaryAggregationPlanMakerTest.java | 8 +-
.../plan/maker/QueryOverrideWithHintsTest.java | 6 +
.../tests/BaseClusterIntegrationTest.java | 127 ++++++-
.../tests/ClusterIntegrationTestUtils.java | 74 ++++
.../tests/UpsertTableIntegrationTest.java | 372 +++++++++++++++++++++
.../UpsertTableSegmentUploadIntegrationTest.java | 6 +-
.../src/test/resources/gameScores_csv.tar.gz | Bin 0 -> 266 bytes
.../resources/gameScores_partial_upsert_csv.tar.gz | Bin 0 -> 297 bytes
.../resources/partial_upsert_table_test.schema | 35 ++
.../src/test/resources/upsert_table_test.schema | 41 +--
...st.schema => upsert_upload_segment_test.schema} | 0
...st.tar.gz => upsert_upload_segment_test.tar.gz} | Bin
.../indexsegment/immutable/EmptyIndexSegment.java | 6 +
.../immutable/ImmutableSegmentImpl.java | 10 +-
.../indexsegment/mutable/MutableSegmentImpl.java | 81 ++---
.../local/realtime/impl/RealtimeSegmentConfig.java | 19 +-
.../upsert/BasePartitionUpsertMetadataManager.java | 52 +--
.../upsert/BaseTableUpsertMetadataManager.java | 3 +-
...oncurrentMapPartitionUpsertMetadataManager.java | 113 +++++--
.../ConcurrentMapTableUpsertMetadataManager.java | 3 +-
.../pinot/segment/local/upsert/RecordInfo.java | 8 +-
.../pinot/segment/local/upsert/UpsertUtils.java | 40 +--
.../segment/local/utils/TableConfigUtils.java | 21 +-
.../dedup/PartitionDedupMetadataManagerTest.java | 11 +-
...rrentMapPartitionUpsertMetadataManagerTest.java | 360 ++++++++++++++++++--
.../segment/local/utils/TableConfigUtilsTest.java | 40 ++-
.../org/apache/pinot/segment/spi/IndexSegment.java | 11 +
.../pinot/server/api/TablesResourceTest.java | 2 +-
.../apache/pinot/spi/config/table/TableConfig.java | 6 +
.../pinot/spi/config/table/UpsertConfig.java | 14 +
32 files changed, 1297 insertions(+), 207 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 99f3c6d802..3f655728bc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1425,6 +1425,7 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumns(tableConfig.getUpsertComparisonColumns())
+ .setUpsertDeleteRecordColumn(tableConfig.getUpsertDeleteRecordColumn())
.setFieldConfigList(tableConfig.getFieldConfigList());
// Create message decoder
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
index 588929f2ea..82c9d51a1e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java
@@ -81,24 +81,33 @@ public class FilterPlanNode implements PlanNode {
@Override
public BaseFilterOperator run() {
- // NOTE: Snapshot the validDocIds before reading the numDocs to prevent the latest updates getting lost
- ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
- MutableRoaringBitmap validDocIdsSnapshot =
- validDocIds != null && !_queryContext.isSkipUpsert() ? validDocIds.getMutableRoaringBitmap() : null;
+ // NOTE: Snapshot the queryableDocIds before reading the numDocs to prevent the latest updates getting lost
+ MutableRoaringBitmap queryableDocIdSnapshot = null;
+ if (!_queryContext.isSkipUpsert()) {
+ ThreadSafeMutableRoaringBitmap queryableDocIds = _indexSegment.getQueryableDocIds();
+ if (queryableDocIds != null) {
+ queryableDocIdSnapshot = queryableDocIds.getMutableRoaringBitmap();
+ } else {
+ ThreadSafeMutableRoaringBitmap validDocIds = _indexSegment.getValidDocIds();
+ if (validDocIds != null) {
+ queryableDocIdSnapshot = validDocIds.getMutableRoaringBitmap();
+ }
+ }
+ }
int numDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
FilterContext filter = _filter != null ? _filter : _queryContext.getFilter();
if (filter != null) {
BaseFilterOperator filterOperator = constructPhysicalOperator(filter, numDocs);
- if (validDocIdsSnapshot != null) {
- BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
+ if (queryableDocIdSnapshot != null) {
+ BaseFilterOperator validDocFilter = new BitmapBasedFilterOperator(queryableDocIdSnapshot, false, numDocs);
return FilterOperatorUtils.getAndFilterOperator(_queryContext, Arrays.asList(filterOperator, validDocFilter),
numDocs);
} else {
return filterOperator;
}
- } else if (validDocIdsSnapshot != null) {
- return new BitmapBasedFilterOperator(validDocIdsSnapshot, false, numDocs);
+ } else if (queryableDocIdSnapshot != null) {
+ return new BitmapBasedFilterOperator(queryableDocIdSnapshot, false, numDocs);
} else {
return new MatchAllFilterOperator(numDocs);
}
@@ -250,9 +259,8 @@ public class FilterPlanNode implements PlanNode {
return new TextContainsFilterOperator(textIndexReader, (TextContainsPredicate) predicate, numDocs);
case TEXT_MATCH:
textIndexReader = dataSource.getTextIndex();
- Preconditions
- .checkState(textIndexReader != null, "Cannot apply TEXT_MATCH on column: %s without text index",
- column);
+ Preconditions.checkState(textIndexReader != null,
+ "Cannot apply TEXT_MATCH on column: %s without text index", column);
// We could check for real time and segment Lucene reader, but easier to check the other way round
if (textIndexReader instanceof NativeTextIndexReader
|| textIndexReader instanceof NativeMutableTextIndex) {
@@ -300,8 +308,8 @@ public class FilterPlanNode implements PlanNode {
return new MatchAllFilterOperator(numDocs);
}
default:
- predicateEvaluator = PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource,
- _queryContext);
+ predicateEvaluator =
+ PredicateEvaluatorProvider.getPredicateEvaluator(predicate, dataSource, _queryContext);
_predicateEvaluators.add(Pair.of(predicate, predicateEvaluator));
return FilterOperatorUtils.getLeafFilterOperator(predicateEvaluator, dataSource, numDocs,
_queryContext.isNullHandlingEnabled());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index 00fec795dc..47459fb691 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -114,8 +114,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
segmentGeneratorConfig.setTableName("testTable");
segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
segmentGeneratorConfig.setOutDir(INDEX_DIR.getAbsolutePath());
- segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED,
- "column6", "column7", "column11", "column17", "column18");
+ segmentGeneratorConfig.setIndexOn(StandardIndexes.inverted(), IndexConfig.ENABLED, "column6", "column7", "column11",
+ "column17", "column18");
// Build the index segment.
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
@@ -131,8 +131,8 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new ConcurrentMapPartitionUpsertMetadataManager("testTable_REALTIME", 0, Collections.singletonList("column6"),
- Collections.singletonList("daysSinceEpoch"), HashFunction.NONE, null, false, serverMetrics),
- new ThreadSafeMutableRoaringBitmap());
+ Collections.singletonList("daysSinceEpoch"), null, HashFunction.NONE, null, false, serverMetrics),
+ new ThreadSafeMutableRoaringBitmap(), null);
}
@AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
index 35ae0d685d..57f169258c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java
@@ -89,6 +89,12 @@ public class QueryOverrideWithHintsTest {
return null;
}
+ @Nullable
+ @Override
+ public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+ return null;
+ }
+
@Override
public GenericRow getRecord(int docId, GenericRow reuse) {
return null;
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 3d32305d76..6a6225bea5 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -37,6 +37,7 @@ import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
+import org.apache.pinot.plugin.inputformat.csv.CSVMessageDecoder;
import org.apache.pinot.plugin.stream.kafka.KafkaStreamConfigProperties;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
@@ -54,6 +55,8 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -396,11 +399,15 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
/**
* Creates a new Upsert enabled table config.
*/
- protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, int numPartitions) {
+ protected TableConfig createUpsertTableConfig(File sampleAvroFile, String primaryKeyColumn, String deleteColumn,
+ int numPartitions) {
AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));
+ UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(deleteColumn);
+
return new TableConfigBuilder(TableType.REALTIME).setTableName(getTableName()).setSchemaName(getSchemaName())
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
@@ -409,7 +416,62 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
.setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
.setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
.setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
- .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build();
+ .setUpsertConfig(upsertConfig).build();
+ }
+
+ protected Map<String, String> getCSVDecoderProperties(@Nullable String delimiter,
+ @Nullable String csvHeaderProperty) {
+ String streamType = "kafka";
+ Map<String, String> csvDecoderProperties = new HashMap<>();
+ csvDecoderProperties.put(
+ StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
+ CSVMessageDecoder.class.getName());
+ if (delimiter != null) {
+ csvDecoderProperties.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.delimiter"),
+ delimiter);
+ }
+ if (csvHeaderProperty != null) {
+ csvDecoderProperties.put(StreamConfigProperties.constructStreamProperty(streamType, "decoder.prop.header"),
+ csvHeaderProperty);
+ }
+ return csvDecoderProperties;
+ }
+
+ /**
+ * Creates a new Upsert enabled table config.
+ */
+ protected TableConfig createCSVUpsertTableConfig(String tableName, @Nullable String schemaName,
+ @Nullable String kafkaTopicName, int numPartitions, Map<String, String> streamDecoderProperties,
+ UpsertConfig upsertConfig, String primaryKeyColumn) {
+ if (schemaName == null) {
+ schemaName = getSchemaName();
+ }
+ Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
+ columnPartitionConfigMap.put(primaryKeyColumn, new ColumnPartitionConfig("Murmur", numPartitions));
+
+ if (upsertConfig == null) {
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ }
+ if (kafkaTopicName == null) {
+ kafkaTopicName = getKafkaTopic();
+ }
+
+ Map<String, String> streamConfigsMap = getStreamConfigMap();
+ streamConfigsMap.put(
+ StreamConfigProperties.constructStreamProperty("kafka", StreamConfigProperties.STREAM_TOPIC_NAME),
+ kafkaTopicName);
+ streamConfigsMap.putAll(streamDecoderProperties);
+
+ return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName).setSchemaName(schemaName)
+ .setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+ .setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+ .setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+ .setLLC(useLlc()).setStreamConfigs(streamConfigsMap)
+ .setNullHandlingEnabled(UpsertConfig.Mode.PARTIAL.equals(upsertConfig.getMode()) || getNullHandlingEnabled())
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .setSegmentPartitionConfig(new SegmentPartitionConfig(columnPartitionConfigMap))
+ .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(primaryKeyColumn, 1))
+ .setUpsertConfig(upsertConfig).build();
}
/**
@@ -498,21 +560,26 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
_queryGenerator = new QueryGenerator(avroFiles, tableName, tableName);
}
+ protected List<File> unpackAvroData(File outputDir)
+ throws Exception {
+ return unpackTarData(getAvroTarFileName(), outputDir);
+ }
+
/**
- * Unpack the tarred Avro data into the given directory.
+ * Unpack the tarred data into the given directory.
*
+ * @param tarFileName Input tar filename
* @param outputDir Output directory
* @return List of files unpacked.
* @throws Exception
*/
- protected List<File> unpackAvroData(File outputDir)
+ protected List<File> unpackTarData(String tarFileName, File outputDir)
throws Exception {
InputStream inputStream =
- BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(getAvroTarFileName());
+ BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(tarFileName);
Assert.assertNotNull(inputStream);
return TarGzCompressionUtils.untar(inputStream, outputDir);
}
-
/**
* Pushes the data in the given Avro files into a Kafka stream.
*
@@ -520,11 +587,57 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest {
*/
protected void pushAvroIntoKafka(List<File> avroFiles)
throws Exception {
-
ClusterIntegrationTestUtils.pushAvroIntoKafka(avroFiles, "localhost:" + getKafkaPort(), getKafkaTopic(),
getMaxNumKafkaMessagesPerBatch(), getKafkaMessageHeader(), getPartitionColumn(), injectTombstones());
}
+ /**
+ * Pushes the data in the given Avro files into a Kafka stream.
+ *
+ * @param csvFile List of CSV strings
+ */
+ protected void pushCsvIntoKafka(File csvFile, String kafkaTopic, @Nullable Integer partitionColumnIndex)
+ throws Exception {
+ String kafkaBroker = "localhost:" + getKafkaPort();
+ StreamDataProducer producer = null;
+ try {
+ producer =
+ StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+ getDefaultKafkaProducerProperties(kafkaBroker));
+ ClusterIntegrationTestUtils.pushCsvIntoKafka(csvFile, kafkaTopic, partitionColumnIndex, injectTombstones(),
+ producer);
+ } catch (Exception e) {
+ if (producer != null) {
+ producer.close();
+ }
+ throw e;
+ }
+ }
+
+ protected void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic, @Nullable Integer partitionColumnIndex) {
+ String kafkaBroker = "localhost:" + getKafkaPort();
+ StreamDataProducer producer = null;
+ try {
+ producer =
+ StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
+ getDefaultKafkaProducerProperties(kafkaBroker));
+ ClusterIntegrationTestUtils.pushCsvIntoKafka(csvRecords, kafkaTopic, partitionColumnIndex, injectTombstones(),
+ producer);
+ } catch (Exception e) {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+ }
+ private Properties getDefaultKafkaProducerProperties(String kafkaBroker) {
+ Properties properties = new Properties();
+ properties.put("metadata.broker.list", kafkaBroker);
+ properties.put("serializer.class", "kafka.serializer.DefaultEncoder");
+ properties.put("request.required.acks", "1");
+ properties.put("partitioner.class", "kafka.producer.ByteArrayPartitioner");
+ return properties;
+ }
+
protected boolean injectTombstones() {
return false;
}
diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
index 189931988e..204303e05e 100644
--- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
+++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterIntegrationTestUtils.java
@@ -25,6 +25,7 @@ import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -55,6 +56,9 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.util.Utf8;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVRecord;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
@@ -337,6 +341,76 @@ public class ClusterIntegrationTestUtils {
TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
}
+ /**
+ * Push the records from the given Avro files into a Kafka stream.
+ *
+ * @param csvFile CSV File name
+ * @param kafkaTopic Kafka topic
+ * @param partitionColumnIndex Optional Index of the partition column
+ * @throws Exception
+ */
+ public static void pushCsvIntoKafka(File csvFile, String kafkaTopic,
+ @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
+ throws Exception {
+
+ if (injectTombstones) {
+ // publish lots of tombstones to livelock the consumer if it can't handle this properly
+ for (int i = 0; i < 1000; i++) {
+ // publish a tombstone first
+ producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+ }
+ }
+ CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
+ try (CSVParser parser = CSVParser.parse(csvFile, StandardCharsets.UTF_8, csvFormat)) {
+ for (CSVRecord csv : parser) {
+ byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+ : csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
+ List<String> cols = new ArrayList<>();
+ for (String col : csv) {
+ cols.add(col);
+ }
+ byte[] bytes = String.join(",", cols).getBytes(StandardCharsets.UTF_8);
+ producer.produce(kafkaTopic, keyBytes, bytes);
+ }
+ }
+ }
+
+ /**
+ * Push the records from the given Avro files into a Kafka stream.
+ *
+ * @param csvRecords List of CSV record string
+ * @param kafkaTopic Kafka topic
+ * @param partitionColumnIndex Optional Index of the partition column
+ * @throws Exception
+ */
+ public static void pushCsvIntoKafka(List<String> csvRecords, String kafkaTopic,
+ @Nullable Integer partitionColumnIndex, boolean injectTombstones, StreamDataProducer producer)
+ throws Exception {
+
+ if (injectTombstones) {
+ // publish lots of tombstones to livelock the consumer if it can't handle this properly
+ for (int i = 0; i < 1000; i++) {
+ // publish a tombstone first
+ producer.produce(kafkaTopic, Longs.toByteArray(System.currentTimeMillis()), null);
+ }
+ }
+ CSVFormat csvFormat = CSVFormat.DEFAULT.withSkipHeaderRecord(true);
+ for (String recordCsv: csvRecords) {
+ try (CSVParser parser = CSVParser.parse(recordCsv, csvFormat)) {
+ for (CSVRecord csv : parser) {
+ byte[] keyBytes = (partitionColumnIndex == null) ? Longs.toByteArray(System.currentTimeMillis())
+ : csv.get(partitionColumnIndex).getBytes(StandardCharsets.UTF_8);
+ List<String> cols = new ArrayList<>();
+ for (String col : csv) {
+ cols.add(col);
+ }
+ byte[] bytes = String.join(",", cols).getBytes(StandardCharsets.UTF_8);
+ producer.produce(kafkaTopic, keyBytes, bytes);
+ }
+ }
+ }
+ }
+
/**
* Push the records from the given Avro files into a Kafka stream.
*
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
new file mode 100644
index 0000000000..5eb5469432
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableIntegrationTest.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.client.ResultSet;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+/**
+ * Input data - Scores of players
+ * Schema
+ * - Dimension fields: playerId:int (primary key), name:string, game:string, deleted:boolean
+ * - Metric fields: score:float
+ * - DataTime fields: timestampInEpoch:long
+ */
+public class UpsertTableIntegrationTest extends BaseClusterIntegrationTestSet {
+ private static final String INPUT_DATA_TAR_FILE = "gameScores_csv.tar.gz";
+ private static final String CSV_SCHEMA_HEADER = "playerId,name,game,score,timestampInEpoch,deleted";
+ private static final String PARTIAL_UPSERT_TABLE_SCHEMA = "partial_upsert_table_test.schema";
+ private static final String CSV_DELIMITER = ",";
+ private static final String TABLE_NAME = "gameScores";
+ private static final int NUM_SERVERS = 2;
+ private static final String PRIMARY_KEY_COL = "playerId";
+ protected static final String DELETE_COL = "deleted";
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+ // Start the Pinot cluster
+ startZk();
+ // Start a customized controller with more frequent realtime segment validation
+ startController();
+ startBroker();
+ startServers(NUM_SERVERS);
+
+ // Start Kafka and push data into Kafka
+ startKafka();
+
+ List<File> unpackDataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
+ pushCsvIntoKafka(unpackDataFiles.get(0), getKafkaTopic(), 0); // TODO: Fix
+
+ // Create and upload schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+
+ Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+ TableConfig tableConfig = createCSVUpsertTableConfig(getTableName(), getSchemaName(), getKafkaTopic(),
+ getNumKafkaPartitions(), csvDecoderProperties, null, PRIMARY_KEY_COL);
+ addTableConfig(tableConfig);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+
+ // Create partial upsert table schema
+ Schema partialUpsertSchema = createSchema(PARTIAL_UPSERT_TABLE_SCHEMA);
+ addSchema(partialUpsertSchema);
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(getTableName());
+ dropRealtimeTable(realtimeTableName);
+ stopServer();
+ stopBroker();
+ stopController();
+ stopKafka();
+ stopZk();
+ FileUtils.deleteDirectory(_tempDir);
+ }
+
+ @Override
+ protected String getSchemaFileName() {
+ return "upsert_table_test.schema";
+ }
+
+ @Override
+ protected String getSchemaName() {
+ return "playerScores";
+ }
+
+ @Nullable
+ @Override
+ protected String getTimeColumnName() {
+ return "timestampInEpoch";
+ }
+
+ @Override
+ protected String getPartitionColumn() {
+ return PRIMARY_KEY_COL;
+ }
+
+ @Override
+ protected String getTableName() {
+ return TABLE_NAME;
+ }
+
+ @Override
+ protected long getCountStarResult() {
+ // Three distinct records are expected with pk values of 100, 101, 102
+ return 3;
+ }
+
+ private Schema createSchema(String schemaFileName)
+ throws IOException {
+ InputStream inputStream =
+ BaseClusterIntegrationTest.class.getClassLoader().getResourceAsStream(schemaFileName);
+ Assert.assertNotNull(inputStream);
+ return Schema.fromInputStream(inputStream);
+ }
+ private long queryCountStarWithoutUpsert(String tableName) {
+ return getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName + " OPTION(skipUpsert=true)")
+ .getResultSet(0).getLong(0);
+ }
+
+ private long getCountStarResultWithoutUpsert() {
+ return 10;
+ }
+
+ @Override
+ protected void waitForAllDocsLoaded(long timeoutMs)
+ throws Exception {
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert(getTableName()) == getCountStarResultWithoutUpsert();
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, timeoutMs, "Failed to load all documents");
+ assertEquals(getCurrentCountStarResult(), getCountStarResult());
+ }
+
+ @Test
+ protected void testDeleteWithFullUpsert()
+ throws Exception {
+ final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(DELETE_COL);
+
+ testDeleteWithFullUpsert(getKafkaTopic() + "-with-deletes", "gameScoresWithDelete", upsertConfig);
+ }
+
+ protected void testDeleteWithFullUpsert(String kafkaTopicName, String tableName, UpsertConfig upsertConfig)
+ throws Exception {
+ // SETUP
+ // Create table with delete Record column
+ Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+ TableConfig tableConfig = createCSVUpsertTableConfig(tableName, getSchemaName(), kafkaTopicName,
+ getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL);
+ addTableConfig(tableConfig);
+
+ // Push initial 10 upsert records - 3 pks 100, 101 and 102
+ List<File> dataFiles = unpackTarData(INPUT_DATA_TAR_FILE, _tempDir);
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+ // TEST 1: Delete existing primary key
+ // Push 2 records with deleted = true - deletes pks 100 and 102
+ List<String> deleteRecords = List.of("102,Clifford,counter-strike,102,1681054200000,true",
+ "100,Zook,counter-strike,2050,1681377200000,true");
+ pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
+
+ // Wait for all docs (12 with skipUpsert=true) to be loaded
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert(tableName) == 12;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+ // Query for number of records in the table - should only return 1
+ ResultSet rs = getPinotConnection().execute("SELECT * FROM " + tableName).getResultSet(0);
+ Assert.assertEquals(rs.getRowCount(), 1);
+
+ // pk 101 - not deleted - only record available
+ int columnCount = rs.getColumnCount();
+ int playerIdColumnIndex = -1;
+ for (int i = 0; i < columnCount; i++) {
+ String columnName = rs.getColumnName(i);
+ if ("playerId".equalsIgnoreCase(columnName)) {
+ playerIdColumnIndex = i;
+ break;
+ }
+ }
+ Assert.assertNotEquals(playerIdColumnIndex, -1);
+ Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
+
+ // Validate deleted records
+ rs = getPinotConnection()
+ .execute("SELECT playerId FROM " + tableName
+ + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+ Assert.assertEquals(rs.getRowCount(), 2);
+ for (int i = 0; i < rs.getRowCount(); i++) {
+ String playerId = rs.getString(i, 0);
+ Assert.assertTrue("100".equalsIgnoreCase(playerId) || "102".equalsIgnoreCase(playerId));
+ }
+
+ // TEST 2: Revive a previously deleted primary key
+ // Revive pk - 100 by adding a record with a newer timestamp
+ List<String> revivedRecord = Collections.singletonList("100,Zook-New,,0.0,1684707335000,false");
+ pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
+ // Wait for the new record (13 with skipUpsert=true) to be indexed
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert(tableName) == 13;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+
+ // Validate: pk is queryable and all columns are overwritten with new value
+ rs = getPinotConnection()
+ .execute("SELECT playerId, name, game FROM " + tableName + " WHERE playerId = 100").getResultSet(0);
+ Assert.assertEquals(rs.getRowCount(), 1);
+ Assert.assertEquals(rs.getInt(0, 0), 100);
+ Assert.assertEquals(rs.getString(0, 1), "Zook-New");
+ Assert.assertEquals(rs.getString(0, 2), "null");
+
+ // Validate: pk lineage still exists
+ rs = getPinotConnection()
+ .execute("SELECT playerId, name FROM " + tableName
+ + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+
+ Assert.assertTrue(rs.getRowCount() > 1);
+
+ // TEARDOWN
+ dropRealtimeTable(tableName);
+ }
+
+ @Test
+ public void testDeleteWithPartialUpsert()
+ throws Exception {
+ final UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.PARTIAL);
+ upsertConfig.setDeleteRecordColumn(DELETE_COL);
+
+ testDeleteWithPartialUpsert(getKafkaTopic() + "-partial-upsert-with-deletes",
+ "gameScoresPartialUpsertWithDelete", upsertConfig);
+ }
+
+ protected void testDeleteWithPartialUpsert(String kafkaTopicName, String tableName, UpsertConfig upsertConfig)
+ throws Exception {
+ final String partialUpsertSchemaName = "playerScoresPartialUpsert";
+ final String inputDataTarFile = "gameScores_partial_upsert_csv.tar.gz";
+
+ Map<String, UpsertConfig.Strategy> partialUpsertStrategies = new HashMap<>();
+ partialUpsertStrategies.put("game", UpsertConfig.Strategy.UNION);
+ partialUpsertStrategies.put("score", UpsertConfig.Strategy.INCREMENT);
+ partialUpsertStrategies.put(DELETE_COL, UpsertConfig.Strategy.OVERWRITE);
+ upsertConfig.setPartialUpsertStrategies(partialUpsertStrategies);
+
+ // Create table with delete Record column
+ Map<String, String> csvDecoderProperties = getCSVDecoderProperties(CSV_DELIMITER, CSV_SCHEMA_HEADER);
+ TableConfig tableConfig = createCSVUpsertTableConfig(tableName, partialUpsertSchemaName, kafkaTopicName,
+ getNumKafkaPartitions(), csvDecoderProperties, upsertConfig, PRIMARY_KEY_COL);
+ addTableConfig(tableConfig);
+
+ // Push initial 10 upsert records - 3 pks 100, 101 and 102
+ List<File> dataFiles = unpackTarData(inputDataTarFile, _tempDir);
+ pushCsvIntoKafka(dataFiles.get(0), kafkaTopicName, 0);
+
+ // TEST 1: Delete existing primary key
+ // Push 2 records with deleted = true - deletes pks 100 and 102
+ List<String> deleteRecords = List.of("102,Clifford,counter-strike,102,1681054200000,true",
+ "100,Zook,counter-strike,2050,1681377200000,true");
+ pushCsvIntoKafka(deleteRecords, kafkaTopicName, 0);
+
+ // Wait for all docs (12 with skipUpsert=true) to be loaded
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert(tableName) == 12;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+ // Query for number of records in the table - should only return 1
+ ResultSet rs = getPinotConnection().execute("SELECT * FROM " + tableName).getResultSet(0);
+ Assert.assertEquals(rs.getRowCount(), 1);
+
+ // pk 101 - not deleted - only record available
+ int columnCount = rs.getColumnCount();
+ int playerIdColumnIndex = -1;
+ for (int i = 0; i < columnCount; i++) {
+ String columnName = rs.getColumnName(i);
+ if ("playerId".equalsIgnoreCase(columnName)) {
+ playerIdColumnIndex = i;
+ break;
+ }
+ }
+ Assert.assertNotEquals(playerIdColumnIndex, -1);
+ Assert.assertEquals(rs.getString(0, playerIdColumnIndex), "101");
+
+ // Validate deleted records
+ rs = getPinotConnection()
+ .execute("SELECT playerId FROM " + tableName
+ + " WHERE deleted = true OPTION(skipUpsert=true)").getResultSet(0);
+ Assert.assertEquals(rs.getRowCount(), 2);
+ for (int i = 0; i < rs.getRowCount(); i++) {
+ String playerId = rs.getString(i, 0);
+ Assert.assertTrue("100".equalsIgnoreCase(playerId) || "102".equalsIgnoreCase(playerId));
+ }
+
+ // TEST 2: Revive a previously deleted primary key
+ // Revive pk - 100 by adding a record with a newer timestamp
+ List<String> revivedRecord = Collections.singletonList("100,Zook,,0.0,1684707335000,false");
+ pushCsvIntoKafka(revivedRecord, kafkaTopicName, 0);
+ // Wait for the new record (13 with skipUpsert=true) to be indexed
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return queryCountStarWithoutUpsert(tableName) == 13;
+ } catch (Exception e) {
+ return null;
+ }
+ }, 100L, 600_000L, "Failed to load all upsert records for testDeleteWithFullUpsert");
+
+ // Validate: pk is queryable and all columns are overwritten with new value
+ rs = getPinotConnection()
+ .execute("SELECT playerId, name, game FROM " + tableName
+ + " WHERE playerId = 100").getResultSet(0);
+ Assert.assertEquals(rs.getRowCount(), 1);
+ Assert.assertEquals(rs.getInt(0, 0), 100);
+ Assert.assertEquals(rs.getString(0, 1), "Zook");
+ Assert.assertEquals(rs.getString(0, 2), "[\"null\"]");
+
+ // Validate: pk lineage still exists
+ rs = getPinotConnection()
+ .execute("SELECT playerId, name FROM " + tableName
+ + " WHERE playerId = 100 OPTION(skipUpsert=true)").getResultSet(0);
+
+ Assert.assertTrue(rs.getRowCount() > 1);
+
+ // TEARDOWN
+ dropRealtimeTable(tableName);
+ }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index e7966e3608..7fddd5e43c 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -74,7 +74,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
// Create and upload schema and table config
Schema schema = createSchema();
addSchema(schema);
- TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions());
+ TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions());
addTableConfig(tableConfig);
// Create and upload segments
@@ -116,7 +116,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
@Override
protected String getSchemaFileName() {
- return "upsert_table_test.schema";
+ return "upsert_upload_segment_test.schema";
}
@Override
@@ -126,7 +126,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat
@Override
protected String getAvroTarFileName() {
- return "upsert_test.tar.gz";
+ return "upsert_upload_segment_test.tar.gz";
}
@Override
diff --git a/pinot-integration-tests/src/test/resources/gameScores_csv.tar.gz b/pinot-integration-tests/src/test/resources/gameScores_csv.tar.gz
new file mode 100644
index 0000000000..48dc5163c6
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/gameScores_csv.tar.gz differ
diff --git a/pinot-integration-tests/src/test/resources/gameScores_partial_upsert_csv.tar.gz b/pinot-integration-tests/src/test/resources/gameScores_partial_upsert_csv.tar.gz
new file mode 100644
index 0000000000..a6fd014e96
Binary files /dev/null and b/pinot-integration-tests/src/test/resources/gameScores_partial_upsert_csv.tar.gz differ
diff --git a/pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema b/pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema
new file mode 100644
index 0000000000..ceb2f2d287
--- /dev/null
+++ b/pinot-integration-tests/src/test/resources/partial_upsert_table_test.schema
@@ -0,0 +1,35 @@
+{
+ "schemaName": "playerScoresPartialUpsert",
+ "dimensionFieldSpecs": [
+ {
+ "name": "playerId",
+ "dataType": "INT"
+ },
+ {
+ "name": "name",
+ "dataType": "STRING"
+ },
+ {
+ "name": "game",
+ "dataType": "STRING",
+ "singleValueField": "false"
+ },
+ {
+ "name": "deleted",
+ "dataType": "BOOLEAN"
+ }
+ ],
+ "metricFieldSpecs": [
+ {
+ "name": "score",
+ "dataType": "FLOAT"
+ }
+ ],
+ "dateTimeFieldSpecs": [{
+ "name": "timestampInEpoch",
+ "dataType": "LONG",
+ "format" : "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }],
+ "primaryKeyColumns": [ "playerId" ]
+}
diff --git a/pinot-integration-tests/src/test/resources/upsert_table_test.schema b/pinot-integration-tests/src/test/resources/upsert_table_test.schema
index 3f656f7936..c0bcebfaae 100644
--- a/pinot-integration-tests/src/test/resources/upsert_table_test.schema
+++ b/pinot-integration-tests/src/test/resources/upsert_table_test.schema
@@ -1,33 +1,34 @@
{
+ "schemaName": "playerScores",
"dimensionFieldSpecs": [
{
- "dataType": "INT",
- "singleValueField": true,
- "name": "clientId"
+ "name": "playerId",
+ "dataType": "INT"
},
{
- "dataType": "STRING",
- "singleValueField": true,
- "name": "city"
+ "name": "name",
+ "dataType": "STRING"
},
{
- "dataType": "STRING",
- "singleValueField": true,
- "name": "description"
+ "name": "game",
+ "dataType": "STRING"
},
{
- "dataType": "INT",
- "singleValueField": true,
- "name": "salary"
+ "name": "deleted",
+ "dataType": "BOOLEAN"
}
],
- "timeFieldSpec": {
- "incomingGranularitySpec": {
- "timeType": "DAYS",
- "dataType": "INT",
- "name": "DaysSinceEpoch"
+ "metricFieldSpecs": [
+ {
+ "name": "score",
+ "dataType": "FLOAT"
}
- },
- "primaryKeyColumns": ["clientId"],
- "schemaName": "upsertSchema"
+ ],
+ "dateTimeFieldSpecs": [{
+ "name": "timestampInEpoch",
+ "dataType": "LONG",
+ "format" : "1:MILLISECONDS:EPOCH",
+ "granularity": "1:MILLISECONDS"
+ }],
+ "primaryKeyColumns": [ "playerId" ]
}
diff --git a/pinot-integration-tests/src/test/resources/upsert_table_test.schema b/pinot-integration-tests/src/test/resources/upsert_upload_segment_test.schema
similarity index 100%
copy from pinot-integration-tests/src/test/resources/upsert_table_test.schema
copy to pinot-integration-tests/src/test/resources/upsert_upload_segment_test.schema
diff --git a/pinot-integration-tests/src/test/resources/upsert_test.tar.gz b/pinot-integration-tests/src/test/resources/upsert_upload_segment_test.tar.gz
similarity index 100%
rename from pinot-integration-tests/src/test/resources/upsert_test.tar.gz
rename to pinot-integration-tests/src/test/resources/upsert_upload_segment_test.tar.gz
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
index 20c0c764b4..681e93a0e0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java
@@ -91,6 +91,12 @@ public class EmptyIndexSegment implements ImmutableSegment {
return null;
}
+ @Nullable
+ @Override
+ public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+ return null;
+ }
+
@Override
public GenericRow getRecord(int docId, GenericRow reuse) {
throw new UnsupportedOperationException("Cannot read record from empty segment");
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
index 66875eb726..dcf9d65362 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java
@@ -76,6 +76,7 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
// For upsert
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private ThreadSafeMutableRoaringBitmap _validDocIds;
+ private ThreadSafeMutableRoaringBitmap _queryableDocIds;
public ImmutableSegmentImpl(SegmentDirectory segmentDirectory, SegmentMetadataImpl segmentMetadata,
Map<String, ColumnIndexContainer> columnIndexContainerMap,
@@ -100,9 +101,10 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
* Enables upsert for this segment. It should be called before the segment getting queried.
*/
public void enableUpsert(PartitionUpsertMetadataManager partitionUpsertMetadataManager,
- ThreadSafeMutableRoaringBitmap validDocIds) {
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds) {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_validDocIds = validDocIds;
+ _queryableDocIds = queryableDocIds;
}
@Nullable
@@ -289,6 +291,12 @@ public class ImmutableSegmentImpl implements ImmutableSegment {
return _validDocIds;
}
+ @Nullable
+ @Override
+ public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+ return _queryableDocIds;
+ }
+
@Override
public GenericRow getRecord(int docId, GenericRow reuse) {
try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index 590f8a124c..2c6885ace5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -92,7 +92,6 @@ import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.IndexConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
-import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
@@ -102,6 +101,7 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.apache.pinot.spi.stream.RowMetadata;
+import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.FixedIntArray;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -159,10 +159,11 @@ public class MutableSegmentImpl implements MutableSegment {
private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders;
- private final UpsertConfig.Mode _upsertMode;
- private final List<String> _upsertComparisonColumns;
- private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
+
+ private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
+ private final List<String> _upsertComparisonColumns;
+ private final String _deleteRecordColumn;
// The valid doc ids are maintained locally instead of in the upsert metadata manager because:
// 1. There is only one consuming segment per partition, the committed segments do not need to modify the valid doc
// ids for the consuming segment.
@@ -172,6 +173,7 @@ public class MutableSegmentImpl implements MutableSegment {
// consumption with newer timestamp (late event in consuming segment), the record location will be updated, but
// the valid doc ids won't be updated.
private final ThreadSafeMutableRoaringBitmap _validDocIds;
+ private final ThreadSafeMutableRoaringBitmap _queryableDocIds;
public MutableSegmentImpl(RealtimeSegmentConfig config, @Nullable ServerMetrics serverMetrics) {
_serverMetrics = serverMetrics;
@@ -356,22 +358,27 @@ public class MutableSegmentImpl implements MutableSegment {
realtimeLuceneIndexRefreshState.addRealtimeReadersToQueue(_realtimeLuceneReaders);
}
- // init upsert-related data structure
- _upsertMode = config.getUpsertMode();
_partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();
- if (isUpsertEnabled()) {
+ _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
+ if (_partitionUpsertMetadataManager != null) {
Preconditions.checkState(!isAggregateMetricsEnabled(),
"Metrics aggregation and upsert cannot be enabled together");
- _partitionUpsertMetadataManager = config.getPartitionUpsertMetadataManager();
- _validDocIds = new ThreadSafeMutableRoaringBitmap();
List<String> upsertComparisonColumns = config.getUpsertComparisonColumns();
_upsertComparisonColumns =
upsertComparisonColumns != null ? upsertComparisonColumns : Collections.singletonList(_timeColumnName);
+ _deleteRecordColumn = config.getUpsertDeleteRecordColumn();
+ _validDocIds = new ThreadSafeMutableRoaringBitmap();
+ if (_deleteRecordColumn != null) {
+ _queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+ } else {
+ _queryableDocIds = null;
+ }
} else {
- _partitionUpsertMetadataManager = null;
- _validDocIds = null;
_upsertComparisonColumns = null;
+ _deleteRecordColumn = null;
+ _validDocIds = null;
+ _queryableDocIds = null;
}
}
@@ -459,21 +466,18 @@ public class MutableSegmentImpl implements MutableSegment {
boolean canTakeMore;
int numDocsIndexed = _numDocsIndexed;
- RecordInfo recordInfo = null;
-
- if (isDedupEnabled() || isUpsertEnabled()) {
- recordInfo = getRecordInfo(row, numDocsIndexed);
- }
-
- if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(),
- this)) {
- if (_serverMetrics != null) {
- _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1);
+ if (isDedupEnabled()) {
+ PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
+ if (_partitionDedupMetadataManager.checkRecordPresentOrUpdate(primaryKey, this)) {
+ if (_serverMetrics != null) {
+ _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1);
+ }
+ return true;
}
- return true;
}
if (isUpsertEnabled()) {
+ RecordInfo recordInfo = getRecordInfo(row, numDocsIndexed);
GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
@@ -512,7 +516,7 @@ public class MutableSegmentImpl implements MutableSegment {
}
private boolean isUpsertEnabled() {
- return _upsertMode != UpsertConfig.Mode.NONE;
+ return _partitionUpsertMetadataManager != null;
}
private boolean isDedupEnabled() {
@@ -521,22 +525,18 @@ public class MutableSegmentImpl implements MutableSegment {
private RecordInfo getRecordInfo(GenericRow row, int docId) {
PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
-
- if (isUpsertEnabled()) {
- if (_upsertComparisonColumns.size() > 1) {
- return multiComparisonRecordInfo(primaryKey, docId, row);
- }
- Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0));
- return new RecordInfo(primaryKey, docId, comparisonValue);
- }
-
- return new RecordInfo(primaryKey, docId, null);
+ Comparable comparisonValue = getComparisonValue(row);
+ boolean deleteRecord = _deleteRecordColumn != null && BooleanUtils.toBoolean(row.getValue(_deleteRecordColumn));
+ return new RecordInfo(primaryKey, docId, comparisonValue, deleteRecord);
}
- private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) {
+ private Comparable getComparisonValue(GenericRow row) {
int numComparisonColumns = _upsertComparisonColumns.size();
- Comparable[] comparisonValues = new Comparable[numComparisonColumns];
+ if (numComparisonColumns == 1) {
+ return (Comparable) row.getValue(_upsertComparisonColumns.get(0));
+ }
+ Comparable[] comparisonValues = new Comparable[numComparisonColumns];
int comparableIndex = -1;
for (int i = 0; i < numComparisonColumns; i++) {
String columnName = _upsertComparisonColumns.get(i);
@@ -557,9 +557,8 @@ public class MutableSegmentImpl implements MutableSegment {
comparisonValues[i] = (Comparable) comparisonValue;
}
}
- Preconditions.checkState(comparableIndex != -1,
- "Documents must have exactly 1 non-null comparison column value");
- return new RecordInfo(primaryKey, docId, new ComparisonColumns(comparisonValues, comparableIndex));
+ Preconditions.checkState(comparableIndex != -1, "Documents must have exactly 1 non-null comparison column value");
+ return new ComparisonColumns(comparisonValues, comparableIndex);
}
private void updateDictionary(GenericRow row) {
@@ -863,6 +862,12 @@ public class MutableSegmentImpl implements MutableSegment {
return _validDocIds;
}
+ @Nullable
+ @Override
+ public ThreadSafeMutableRoaringBitmap getQueryableDocIds() {
+ return _queryableDocIds;
+ }
+
@Override
public GenericRow getRecord(int docId, GenericRow reuse) {
try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
index 0238330c91..f4fd501ac5 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/impl/RealtimeSegmentConfig.java
@@ -61,6 +61,7 @@ public class RealtimeSegmentConfig {
private final boolean _nullHandlingEnabled;
private final UpsertConfig.Mode _upsertMode;
private final List<String> _upsertComparisonColumns;
+ private final String _upsertDeleteRecordColumn;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final String _consumerDir;
@@ -74,7 +75,7 @@ public class RealtimeSegmentConfig {
PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir, UpsertConfig.Mode upsertMode, List<String> upsertComparisonColumns,
- PartitionUpsertMetadataManager partitionUpsertMetadataManager,
+ String upsertDeleteRecordColumn, PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
_tableNameWithType = tableNameWithType;
@@ -97,6 +98,7 @@ public class RealtimeSegmentConfig {
_consumerDir = consumerDir;
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
_upsertComparisonColumns = upsertComparisonColumns;
+ _upsertDeleteRecordColumn = upsertDeleteRecordColumn;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_fieldConfigList = fieldConfigList;
@@ -188,6 +190,10 @@ public class RealtimeSegmentConfig {
return _upsertComparisonColumns;
}
+ public String getUpsertDeleteRecordColumn() {
+ return _upsertDeleteRecordColumn;
+ }
+
public PartitionUpsertMetadataManager getPartitionUpsertMetadataManager() {
return _partitionUpsertMetadataManager;
}
@@ -225,6 +231,7 @@ public class RealtimeSegmentConfig {
private String _consumerDir;
private UpsertConfig.Mode _upsertMode;
private List<String> _upsertComparisonColumns;
+ private String _upsertDeleteRecordColumn;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
private List<FieldConfig> _fieldConfigList;
@@ -360,6 +367,11 @@ public class RealtimeSegmentConfig {
return this;
}
+ public Builder setUpsertDeleteRecordColumn(String upsertDeleteRecordColumn) {
+ _upsertDeleteRecordColumn = upsertDeleteRecordColumn;
+ return this;
+ }
+
public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
return this;
@@ -389,8 +401,9 @@ public class RealtimeSegmentConfig {
return new RealtimeSegmentConfig(_tableNameWithType, _segmentName, _streamName, _schema, _timeColumnName,
_capacity, _avgNumMultiValues, Collections.unmodifiableMap(indexConfigByCol), _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
- _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _partitionUpsertMetadataManager,
- _partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
+ _nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumns, _upsertDeleteRecordColumn,
+ _partitionUpsertMetadataManager, _partitionDedupMetadataManager, _fieldConfigList,
+ _ingestionAggregationConfigs);
}
}
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 1501cd28e6..92c1a18749 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -57,6 +57,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected final int _partitionId;
protected final List<String> _primaryKeyColumns;
protected final List<String> _comparisonColumns;
+ protected final String _deleteRecordColumn;
protected final HashFunction _hashFunction;
protected final PartialUpsertHandler _partialUpsertHandler;
protected final boolean _enableSnapshot;
@@ -78,12 +79,14 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected int _numOutOfOrderEvents = 0;
protected BasePartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
- List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
- @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
+ List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
+ HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
+ ServerMetrics serverMetrics) {
_tableNameWithType = tableNameWithType;
_partitionId = partitionId;
_primaryKeyColumns = primaryKeyColumns;
_comparisonColumns = comparisonColumns;
+ _deleteRecordColumn = deleteRecordColumn;
_hashFunction = hashFunction;
_partialUpsertHandler = partialUpsertHandler;
_enableSnapshot = enableSnapshot;
@@ -138,7 +141,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (validDocIds != null && validDocIds.isEmpty()) {
_logger.info("Skip adding segment: {} without valid doc, current primary key count: {}",
segment.getSegmentName(), getNumPrimaryKeys());
- segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap());
+ segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
return;
}
} else {
@@ -146,8 +149,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
segment.deleteValidDocIdsSnapshot();
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
- _comparisonColumns)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+ _comparisonColumns, _deleteRecordColumn)) {
Iterator<RecordInfo> recordInfoIterator;
if (validDocIds != null) {
recordInfoIterator = UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds);
@@ -155,7 +158,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
}
- addSegment(segment, null, recordInfoIterator);
+ addSegment(segment, null, null, recordInfoIterator);
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while adding segment: %s, table: %s", segmentName, _tableNameWithType), e);
@@ -171,12 +174,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
/**
- * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
- * validDocIds should always be empty.
+ * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
+ * tests. The passed in bitmaps should always be empty.
*/
@VisibleForTesting
public void addSegment(ImmutableSegmentImpl segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
- Iterator<RecordInfo> recordInfoIterator) {
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
@@ -184,7 +187,10 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (validDocIds == null) {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
- addOrReplaceSegment(segment, validDocIds, recordInfoIterator, null, null);
+ if (queryableDocIds == null && _deleteRecordColumn != null) {
+ queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ addOrReplaceSegment(segment, validDocIds, queryableDocIds, recordInfoIterator, null, null);
} finally {
segmentLock.unlock();
}
@@ -193,8 +199,8 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
protected abstract long getNumPrimaryKeys();
protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
- Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
- @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+ @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment);
@Override
public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
@@ -254,15 +260,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (segment instanceof EmptyIndexSegment) {
_logger.info("Skip adding empty segment: {}", segmentName);
- replaceSegment(segment, null, null, oldSegment);
+ replaceSegment(segment, null, null, null, oldSegment);
return;
}
- try (UpsertUtils.RecordInfoReader recordInfoReader = UpsertUtils.makeRecordReader(segment, _primaryKeyColumns,
- _comparisonColumns)) {
+ try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
+ _comparisonColumns, _deleteRecordColumn)) {
Iterator<RecordInfo> recordInfoIterator =
UpsertUtils.getRecordInfoIterator(recordInfoReader, segment.getSegmentMetadata().getTotalDocs());
- replaceSegment(segment, null, recordInfoIterator, oldSegment);
+ replaceSegment(segment, null, null, recordInfoIterator, oldSegment);
} catch (Exception e) {
throw new RuntimeException(
String.format("Caught exception while replacing segment: %s, table: %s", segmentName, _tableNameWithType), e);
@@ -278,12 +284,13 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
}
/**
- * NOTE: We allow passing in validDocIds here so that the value can be easily accessed from the tests. The passed in
- * validDocIds should always be empty.
+ * NOTE: We allow passing in validDocIds and queryableDocIds here so that the value can be easily accessed from the
+ * tests. The passed in bitmaps should always be empty.
*/
@VisibleForTesting
public void replaceSegment(ImmutableSegment segment, @Nullable ThreadSafeMutableRoaringBitmap validDocIds,
- @Nullable Iterator<RecordInfo> recordInfoIterator, IndexSegment oldSegment) {
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, @Nullable Iterator<RecordInfo> recordInfoIterator,
+ IndexSegment oldSegment) {
String segmentName = segment.getSegmentName();
Lock segmentLock = SegmentLocks.getSegmentLock(_tableNameWithType, segmentName);
segmentLock.lock();
@@ -297,8 +304,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps
if (validDocIds == null) {
validDocIds = new ThreadSafeMutableRoaringBitmap();
}
- addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, recordInfoIterator, oldSegment,
- validDocIdsForOldSegment);
+ if (queryableDocIds == null && _deleteRecordColumn != null) {
+ queryableDocIds = new ThreadSafeMutableRoaringBitmap();
+ }
+ addOrReplaceSegment((ImmutableSegmentImpl) segment, validDocIds, queryableDocIds, recordInfoIterator,
+ oldSegment, validDocIdsForOldSegment);
}
if (validDocIdsForOldSegment != null && !validDocIdsForOldSegment.isEmpty()) {
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 2ceb5ddc72..06e112b1e6 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -37,6 +37,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
protected String _tableNameWithType;
protected List<String> _primaryKeyColumns;
protected List<String> _comparisonColumns;
+ protected String _deleteRecordColumn;
protected HashFunction _hashFunction;
protected PartialUpsertHandler _partialUpsertHandler;
protected boolean _enableSnapshot;
@@ -60,6 +61,7 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
_comparisonColumns = Collections.singletonList(tableConfig.getValidationConfig().getTimeColumnName());
}
+ _deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
_hashFunction = upsertConfig.getHashFunction();
if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) {
@@ -72,7 +74,6 @@ public abstract class BaseTableUpsertMetadataManager implements TableUpsertMetad
}
_enableSnapshot = upsertConfig.isEnableSnapshot();
-
_serverMetrics = serverMetrics;
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 29b293cbaa..6e8b195bbc 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
@@ -57,10 +58,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
private final GenericRow _reuse = new GenericRow();
public ConcurrentMapPartitionUpsertMetadataManager(String tableNameWithType, int partitionId,
- List<String> primaryKeyColumns, List<String> comparisonColumns, HashFunction hashFunction,
- @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot, ServerMetrics serverMetrics) {
- super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, hashFunction, partialUpsertHandler,
- enableSnapshot, serverMetrics);
+ List<String> primaryKeyColumns, List<String> comparisonColumns, @Nullable String deleteRecordColumn,
+ HashFunction hashFunction, @Nullable PartialUpsertHandler partialUpsertHandler, boolean enableSnapshot,
+ ServerMetrics serverMetrics) {
+ super(tableNameWithType, partitionId, primaryKeyColumns, comparisonColumns, deleteRecordColumn, hashFunction,
+ partialUpsertHandler, enableSnapshot, serverMetrics);
}
@Override
@@ -70,29 +72,31 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
@Override
protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutableRoaringBitmap validDocIds,
- Iterator<RecordInfo> recordInfoIterator, @Nullable IndexSegment oldSegment,
- @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, Iterator<RecordInfo> recordInfoIterator,
+ @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap validDocIdsForOldSegment) {
String segmentName = segment.getSegmentName();
- segment.enableUpsert(this, validDocIds);
+ segment.enableUpsert(this, validDocIds, queryableDocIds);
AtomicInteger numKeysInWrongSegment = new AtomicInteger();
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
+ int newDocId = recordInfo.getDocId();
+ Comparable newComparisonValue = recordInfo.getComparisonValue();
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key
IndexSegment currentSegment = currentRecordLocation.getSegment();
- int comparisonResult =
- recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue());
+ int currentDocId = currentRecordLocation.getDocId();
+ int comparisonResult = newComparisonValue.compareTo(currentRecordLocation.getComparisonValue());
// The current record is in the same segment
// Update the record location when there is a tie to keep the newer record. Note that the record info
// iterator will return records with incremental doc ids.
if (currentSegment == segment) {
if (comparisonResult >= 0) {
- validDocIds.replace(currentRecordLocation.getDocId(), recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
return currentRecordLocation;
}
@@ -106,11 +110,11 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
// snapshot for the old segment, which can be updated and used to track the docs not replaced yet.
if (currentSegment == oldSegment) {
if (comparisonResult >= 0) {
- validDocIds.add(recordInfo.getDocId());
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
if (validDocIdsForOldSegment != null) {
- validDocIdsForOldSegment.remove(currentRecordLocation.getDocId());
+ validDocIdsForOldSegment.remove(currentDocId);
}
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
return currentRecordLocation;
}
@@ -122,8 +126,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
if (currentSegmentName.equals(segmentName)) {
numKeysInWrongSegment.getAndIncrement();
if (comparisonResult >= 0) {
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
return currentRecordLocation;
}
@@ -137,16 +141,16 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
segmentName) && LLCSegmentName.isLowLevelConsumerSegmentName(currentSegmentName)
&& LLCSegmentName.getSequenceNumber(segmentName) > LLCSegmentName.getSequenceNumber(
currentSegmentName))) {
- Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentRecordLocation.getDocId());
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ removeDocId(currentSegment, currentDocId);
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
return currentRecordLocation;
}
} else {
// New primary key
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue);
}
});
}
@@ -157,6 +161,34 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
}
}
+ private static void replaceDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int oldDocId, int newDocId, RecordInfo recordInfo) {
+ validDocIds.replace(oldDocId, newDocId);
+ if (queryableDocIds != null) {
+ if (recordInfo.isDeleteRecord()) {
+ queryableDocIds.remove(oldDocId);
+ } else {
+ queryableDocIds.replace(oldDocId, newDocId);
+ }
+ }
+ }
+
+ private static void addDocId(ThreadSafeMutableRoaringBitmap validDocIds,
+ @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, int docId, RecordInfo recordInfo) {
+ validDocIds.add(docId);
+ if (queryableDocIds != null && !recordInfo.isDeleteRecord()) {
+ queryableDocIds.add(docId);
+ }
+ }
+
+ private static void removeDocId(IndexSegment segment, int docId) {
+ Objects.requireNonNull(segment.getValidDocIds()).remove(docId);
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds = segment.getQueryableDocIds();
+ if (currentQueryableDocIds != null) {
+ currentQueryableDocIds.remove(docId);
+ }
+ }
+
@Override
protected void removeSegment(IndexSegment segment, MutableRoaringBitmap validDocIds) {
assert !validDocIds.isEmpty();
@@ -184,6 +216,9 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
@Override
protected void doAddRecord(MutableSegment segment, RecordInfo recordInfo) {
ThreadSafeMutableRoaringBitmap validDocIds = Objects.requireNonNull(segment.getValidDocIds());
+ ThreadSafeMutableRoaringBitmap queryableDocIds = segment.getQueryableDocIds();
+ int newDocId = recordInfo.getDocId();
+ Comparable newComparisonValue = recordInfo.getComparisonValue();
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
@@ -191,24 +226,24 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
// Update the record location when the new comparison value is greater than or equal to the current value.
// Update the record location when there is a tie to keep the newer record.
- if (recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
+ if (newComparisonValue.compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
if (segment == currentSegment) {
- validDocIds.replace(currentDocId, recordInfo.getDocId());
+ replaceDocId(validDocIds, queryableDocIds, currentDocId, newDocId, recordInfo);
} else {
- Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId);
- validDocIds.add(recordInfo.getDocId());
+ removeDocId(currentSegment, currentDocId);
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
}
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ return new RecordLocation(segment, newDocId, newComparisonValue);
} else {
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
return currentRecordLocation;
}
} else {
// New primary key
- validDocIds.add(recordInfo.getDocId());
- return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue());
+ addDocId(validDocIds, queryableDocIds, newDocId, recordInfo);
+ return new RecordLocation(segment, newDocId, newComparisonValue);
}
});
@@ -221,18 +256,32 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp
protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) {
assert _partialUpsertHandler != null;
AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>();
+ AtomicBoolean outOfOrder = new AtomicBoolean();
RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent(
HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> {
if (recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) {
- _reuse.clear();
- previousRecordReference.set(recordLocation.getSegment().getRecord(recordLocation.getDocId(), _reuse));
+ if (!recordInfo.isDeleteRecord()) {
+ IndexSegment currentSegment = recordLocation.getSegment();
+ int currentDocId = recordLocation.getDocId();
+ ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds();
+ if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) {
+ // if delete is not enabled or previous record not marked as deleted
+ _reuse.clear();
+ previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
+ }
+ }
+ } else {
+ outOfOrder.set(true);
}
return recordLocation;
});
if (currentRecordLocation != null) {
// Existing primary key
- GenericRow previousRecord = previousRecordReference.get();
- if (previousRecord != null) {
+ if (!outOfOrder.get()) {
+ GenericRow previousRecord = previousRecordReference.get();
+ if (previousRecord == null) {
+ return record;
+ }
return _partialUpsertHandler.merge(previousRecord, record);
} else {
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue());
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
index cfc6e529a4..b08ea591d1 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapTableUpsertMetadataManager.java
@@ -36,7 +36,8 @@ public class ConcurrentMapTableUpsertMetadataManager extends BaseTableUpsertMeta
public ConcurrentMapPartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId) {
return _partitionMetadataManagerMap.computeIfAbsent(partitionId,
k -> new ConcurrentMapPartitionUpsertMetadataManager(_tableNameWithType, k, _primaryKeyColumns,
- _comparisonColumns, _hashFunction, _partialUpsertHandler, _enableSnapshot, _serverMetrics));
+ _comparisonColumns, _deleteRecordColumn, _hashFunction, _partialUpsertHandler, _enableSnapshot,
+ _serverMetrics));
}
@Override
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
index f4f139f6c3..4df31f27c0 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/RecordInfo.java
@@ -26,11 +26,13 @@ public class RecordInfo {
private final PrimaryKey _primaryKey;
private final int _docId;
private final Comparable _comparisonValue;
+ private final boolean _deleteRecord;
- public RecordInfo(PrimaryKey primaryKey, int docId, Comparable comparisonValue) {
+ public RecordInfo(PrimaryKey primaryKey, int docId, Comparable comparisonValue, boolean deleteRecord) {
_primaryKey = primaryKey;
_docId = docId;
_comparisonValue = comparisonValue;
+ _deleteRecord = deleteRecord;
}
public PrimaryKey getPrimaryKey() {
@@ -44,4 +46,8 @@ public class RecordInfo {
public Comparable getComparisonValue() {
return _comparisonValue;
}
+
+ public boolean isDeleteRecord() {
+ return _deleteRecord;
+ }
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
index 6a7d3ed626..8dd1d8c475 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java
@@ -23,9 +23,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.data.readers.PrimaryKey;
+import org.apache.pinot.spi.utils.BooleanUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.PeekableIntIterator;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
@@ -75,31 +77,32 @@ public class UpsertUtils {
};
}
- public static RecordInfoReader makeRecordReader(IndexSegment segment, List<String> primaryKeyColumns,
- List<String> comparisonColumns) {
- if (comparisonColumns.size() > 1) {
- return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns);
- }
- return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns.get(0));
- }
-
public static class RecordInfoReader implements Closeable {
- public final PrimaryKeyReader _primaryKeyReader;
- public final ComparisonColumnReader _comparisonColumnReader;
+ private final PrimaryKeyReader _primaryKeyReader;
+ private final ComparisonColumnReader _comparisonColumnReader;
+ private final PinotSegmentColumnReader _deleteRecordColumnReader;
- public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, List<String> comparisonColumns) {
+ public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, List<String> comparisonColumns,
+ @Nullable String deleteRecordColumn) {
_primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
- _comparisonColumnReader = new MultiComparisonColumnReader(segment, comparisonColumns);
- }
-
- public RecordInfoReader(IndexSegment segment, List<String> primaryKeyColumns, String comparisonColumn) {
- _primaryKeyReader = new PrimaryKeyReader(segment, primaryKeyColumns);
- _comparisonColumnReader = new SingleComparisonColumnReader(segment, comparisonColumn);
+ if (comparisonColumns.size() == 1) {
+ _comparisonColumnReader = new SingleComparisonColumnReader(segment, comparisonColumns.get(0));
+ } else {
+ _comparisonColumnReader = new MultiComparisonColumnReader(segment, comparisonColumns);
+ }
+ if (deleteRecordColumn != null) {
+ _deleteRecordColumnReader = new PinotSegmentColumnReader(segment, deleteRecordColumn);
+ } else {
+ _deleteRecordColumnReader = null;
+ }
}
public RecordInfo getRecordInfo(int docId) {
PrimaryKey primaryKey = _primaryKeyReader.getPrimaryKey(docId);
- return new RecordInfo(primaryKey, docId, _comparisonColumnReader.getComparisonValue(docId));
+ Comparable comparisonValue = _comparisonColumnReader.getComparisonValue(docId);
+ boolean deleteRecord = _deleteRecordColumnReader != null
+ && BooleanUtils.toBoolean(_deleteRecordColumnReader.getValue(docId));
+ return new RecordInfo(primaryKey, docId, comparisonValue, deleteRecord);
}
@Override
@@ -151,7 +154,6 @@ public class UpsertUtils {
return value instanceof byte[] ? new ByteArray((byte[]) value) : value;
}
-
public interface ComparisonColumnReader extends Closeable {
Comparable getComparisonValue(int docId);
}
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 9ad82bc23c..89cac72cac 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -570,20 +570,29 @@ public final class TableConfigUtils {
"Upsert/Dedup table must use strict replica-group (i.e. strictReplicaGroup) based routing");
// specifically for upsert
- if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) {
-
+ UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
+ if (upsertConfig != null) {
// no startree index
Preconditions.checkState(CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs())
&& !tableConfig.getIndexingConfig().isEnableDefaultStarTree(),
"The upsert table cannot have star-tree index.");
// comparison column exists
- if (tableConfig.getUpsertConfig().getComparisonColumns() != null) {
- List<String> comparisonCols = tableConfig.getUpsertConfig().getComparisonColumns();
- for (String comparisonCol : comparisonCols) {
- Preconditions.checkState(schema.hasColumn(comparisonCol), "The comparison column does not exist on schema");
+ List<String> comparisonColumns = upsertConfig.getComparisonColumns();
+ if (comparisonColumns != null) {
+ for (String column : comparisonColumns) {
+ Preconditions.checkState(schema.hasColumn(column), "The comparison column does not exist on schema");
}
}
+
+ // Delete record column exist and is a BOOLEAN field
+ String deleteRecordColumn = upsertConfig.getDeleteRecordColumn();
+ if (deleteRecordColumn != null) {
+ FieldSpec fieldSpec = schema.getFieldSpecFor(deleteRecordColumn);
+ Preconditions.checkState(
+ fieldSpec != null && fieldSpec.isSingleValueField() && fieldSpec.getDataType() == DataType.BOOLEAN,
+ "The delete record column must be a single-valued BOOLEAN column");
+ }
}
validateAggregateMetricsForUpsertConfig(tableConfig);
}
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
index c1c90fae97..0fed3d59dd 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManagerTest.java
@@ -25,7 +25,6 @@ import java.util.Map;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
-import org.apache.pinot.segment.local.upsert.RecordInfo;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.spi.config.table.HashFunction;
@@ -122,20 +121,16 @@ public class PartitionDedupMetadataManagerTest {
metadataManager.addSegment(segment1);
// Same PK exists
- RecordInfo recordInfo = mock(RecordInfo.class);
- when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(0));
ImmutableSegmentImpl segment2 = mockSegment(2);
- Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+ Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(0), segment2));
checkRecordLocation(recordLocationMap, 0, segment1, hashFunction);
// New PK
- when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
- Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+ Assert.assertFalse(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3), segment2));
checkRecordLocation(recordLocationMap, 3, segment2, hashFunction);
// Same PK as the one recently ingested
- when(recordInfo.getPrimaryKey()).thenReturn(getPrimaryKey(3));
- Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(recordInfo.getPrimaryKey(), segment2));
+ Assert.assertTrue(metadataManager.checkRecordPresentOrUpdate(getPrimaryKey(3), segment2));
}
private static ImmutableSegmentImpl mockSegment(int sequenceNumber) {
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index a89f7e9d64..02fa90abdb 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
@@ -44,6 +45,7 @@ import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
+import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
@@ -76,7 +78,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
@@ -86,7 +88,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
- ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, primaryKeys1);
+ ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, null, primaryKeys1);
List<RecordInfo> recordInfoList1;
if (enableSnapshot) {
// get recordInfo from validDocIdSnapshot.
@@ -94,12 +96,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
int[] docIds1 = new int[]{2, 4, 5};
MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
validDocIdsSnapshot1.add(docIds1);
- recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps);
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps, null);
} else {
// get recordInfo by iterating all records.
- recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+ recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, null);
}
- upsertMetadataManager.addSegment(segment1, validDocIds1, recordInfoList1.iterator());
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null, recordInfoList1.iterator());
trackedSegments.add(segment1);
// segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
assertEquals(recordLocationMap.size(), 3);
@@ -113,7 +115,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
primaryKeys = new int[]{0, 1, 2, 3, 0};
timestamps = new int[]{100, 100, 120, 80, 80};
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
- ImmutableSegmentImpl segment2 = mockImmutableSegment(2, validDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys));
List<RecordInfo> recordInfoList2;
if (enableSnapshot) {
// get recordInfo from validDocIdSnapshot.
@@ -121,12 +124,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
// segment1 snapshot: 1 -> {4, 120}
MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
validDocIdsSnapshot2.add(0, 2, 3);
- recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps);
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps, null);
} else {
// get recordInfo by iterating all records.
- recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps);
+ recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, null);
}
- upsertMetadataManager.addSegment(segment2, validDocIds2, recordInfoList2.iterator());
+ upsertMetadataManager.addSegment(segment2, validDocIds2, null, recordInfoList2.iterator());
trackedSegments.add(segment2);
// segment1: 1 -> {4, 120}
@@ -154,8 +157,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
// Replace (reload) the first segment
ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
- ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, primaryKeys1);
- upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, recordInfoList1.iterator(), segment1);
+ ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, null, primaryKeys1);
+ upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, null, recordInfoList1.iterator(), segment1);
trackedSegments.add(newSegment1);
trackedSegments.remove(segment1);
// original segment1: 1 -> {4, 120} (not in the map)
@@ -220,10 +223,198 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
- private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps) {
+ @Test
+ public void testAddReplaceRemoveSegmentWithRecordDelete()
+ throws IOException {
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, false);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, false);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, false);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.NONE, true);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MD5, true);
+ verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
+ }
+
+ private void verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction hashFunction, boolean enableSnapshot)
+ throws IOException {
+ String comparisonColumn = "timeCol";
+ String deleteRecordColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteRecordColumn, hashFunction, null, false,
+ mock(ServerMetrics.class));
+ Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+ Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+ // Add the first segment
+ int numRecords = 6;
+ int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+ int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+ boolean[] deleteFlags = new boolean[]{false, false, false, true, true, false};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+ ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, queryableDocIds1, primaryKeys1);
+ List<RecordInfo> recordInfoList1;
+ if (enableSnapshot) {
+ // get recordInfo from validDocIdSnapshot.
+ // segment1 snapshot: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ int[] docIds1 = new int[]{2, 4, 5};
+ MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
+ validDocIdsSnapshot1.add(docIds1);
+ recordInfoList1 = getRecordInfoList(validDocIdsSnapshot1, primaryKeys, timestamps, deleteFlags);
+ } else {
+ // get recordInfo by iterating all records.
+ recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, deleteFlags);
+ }
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1, recordInfoList1.iterator());
+ trackedSegments.add(segment1);
+ // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+ assertEquals(recordLocationMap.size(), 3);
+ checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 5});
+
+ // Add the second segment
+ numRecords = 5;
+ primaryKeys = new int[]{0, 1, 2, 3, 0};
+ timestamps = new int[]{100, 100, 120, 80, 80};
+ deleteFlags = new boolean[]{false, true, true, false, false};
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment2 =
+ mockImmutableSegment(2, validDocIds2, queryableDocIds2, getPrimaryKeyList(numRecords, primaryKeys));
+ List<RecordInfo> recordInfoList2;
+ if (enableSnapshot) {
+ // get recordInfo from validDocIdSnapshot.
+ // segment2 snapshot: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // segment1 snapshot: 1 -> {4, 120}
+ MutableRoaringBitmap validDocIdsSnapshot2 = new MutableRoaringBitmap();
+ validDocIdsSnapshot2.add(0, 2, 3);
+ recordInfoList2 = getRecordInfoList(validDocIdsSnapshot2, primaryKeys, timestamps, deleteFlags);
+ } else {
+ // get recordInfo by iterating all records.
+ recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, deleteFlags);
+ }
+ upsertMetadataManager.addSegment(segment2, validDocIds2, queryableDocIds2, recordInfoList2.iterator());
+ trackedSegments.add(segment2);
+
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+
+ // Add an empty segment
+ EmptyIndexSegment emptySegment = mockEmptySegment(3);
+ upsertMetadataManager.addSegment(emptySegment);
+ // segment1: 1 -> {4, 120}
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+
+ // Replace (reload) the first segment
+ ThreadSafeMutableRoaringBitmap newValidDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap newQueryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl newSegment1 = mockImmutableSegment(1, newValidDocIds1, newQueryableDocIds1, primaryKeys1);
+ upsertMetadataManager.replaceSegment(newSegment1, newValidDocIds1, newQueryableDocIds1, recordInfoList1.iterator(),
+ segment1);
+ trackedSegments.add(newSegment1);
+ trackedSegments.remove(segment1);
+
+ // original segment1: 1 -> {4, 120} (not in the map)
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+ Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Remove the original segment1
+ upsertMetadataManager.removeSegment(segment1);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ Assert.assertTrue(queryableDocIds1.getMutableRoaringBitmap().isEmpty());
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+ Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Remove the empty segment
+ upsertMetadataManager.removeSegment(emptySegment);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 4);
+ checkRecordLocation(recordLocationMap, 0, segment2, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 2, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 80, hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+ Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Remove segment2
+ upsertMetadataManager.removeSegment(segment2);
+ // segment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} (not in the map)
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3});
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 3});
+ Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Remove new segment1, should be no-op
+ upsertMetadataManager.removeSegment(newSegment1);
+ // new segment1: 1 -> {4, 120}
+ assertEquals(recordLocationMap.size(), 1);
+ checkRecordLocation(recordLocationMap, 1, newSegment1, 4, 120, hashFunction);
+ assertEquals(newValidDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4});
+ assertEquals(trackedSegments, Collections.singleton(newSegment1));
+ Assert.assertTrue(newQueryableDocIds1.getMutableRoaringBitmap().isEmpty());
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
+ private List<RecordInfo> getRecordInfoList(int numRecords, int[] primaryKeys, int[] timestamps,
+ @Nullable boolean[] deleteRecordFlags) {
List<RecordInfo> recordInfoList = new ArrayList<>();
for (int i = 0; i < numRecords; i++) {
- recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i])));
+ recordInfoList.add(new RecordInfo(makePrimaryKey(primaryKeys[i]), i, new IntWrapper(timestamps[i]),
+ deleteRecordFlags != null && deleteRecordFlags[i]));
}
return recordInfoList;
}
@@ -232,11 +423,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
* Get recordInfo from validDocIdsSnapshot (enabledSnapshot = True).
*/
private List<RecordInfo> getRecordInfoList(MutableRoaringBitmap validDocIdsSnapshot, int[] primaryKeys,
- int[] timestamps) {
+ int[] timestamps, @Nullable boolean[] deleteRecordFlags) {
List<RecordInfo> recordInfoList = new ArrayList<>();
Iterator<Integer> validDocIdsIterator = validDocIdsSnapshot.iterator();
validDocIdsIterator.forEachRemaining((docId) -> recordInfoList.add(
- new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId]))));
+ new RecordInfo(makePrimaryKey(primaryKeys[docId]), docId, new IntWrapper(timestamps[docId]),
+ deleteRecordFlags != null && deleteRecordFlags[docId])));
return recordInfoList;
}
@@ -249,10 +441,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
}
private static ImmutableSegmentImpl mockImmutableSegment(int sequenceNumber,
- ThreadSafeMutableRoaringBitmap validDocIds, List<PrimaryKey> primaryKeys) {
+ ThreadSafeMutableRoaringBitmap validDocIds, @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds,
+ List<PrimaryKey> primaryKeys) {
ImmutableSegmentImpl segment = mock(ImmutableSegmentImpl.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
when(segment.getValidDocIds()).thenReturn(validDocIds);
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
DataSource dataSource = mock(DataSource.class);
when(segment.getDataSource(anyString())).thenReturn(dataSource);
ForwardIndexReader forwardIndex = mock(ForwardIndexReader.class);
@@ -270,9 +464,11 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
return new EmptyIndexSegment(segmentMetadata);
}
- private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds) {
+ private static MutableSegment mockMutableSegment(int sequenceNumber, ThreadSafeMutableRoaringBitmap validDocIds,
+ ThreadSafeMutableRoaringBitmap queryableDocIds) {
MutableSegment segment = mock(MutableSegment.class);
when(segment.getSegmentName()).thenReturn(getSegmentName(sequenceNumber));
+ when(segment.getQueryableDocIds()).thenReturn(queryableDocIds);
when(segment.getValidDocIds()).thenReturn(validDocIds);
return segment;
}
@@ -308,7 +504,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
String comparisonColumn = "timeCol";
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
- Collections.singletonList(comparisonColumn), hashFunction, null, false, mock(ServerMetrics.class));
+ Collections.singletonList(comparisonColumn), null, hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
// Add the first segment
@@ -317,14 +513,15 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
int[] primaryKeys = new int[]{0, 1, 2};
int[] timestamps = new int[]{100, 120, 100};
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
- ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
- upsertMetadataManager.addSegment(segment1, validDocIds1,
- getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, null, getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, null,
+ getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
// Update records from the second segment
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
- MutableSegment segment2 = mockMutableSegment(1, validDocIds2);
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100)));
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2, null);
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
@@ -335,7 +532,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120)));
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120), false));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
@@ -346,7 +543,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100)));
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100), false));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
@@ -357,7 +554,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 3, new IntWrapper(100)));
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 3, new IntWrapper(100), false));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
@@ -372,7 +569,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.stop();
// Add record should be no-op
- upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120)));
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120), false));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment2, 3, 100, hashFunction);
@@ -386,6 +583,115 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest {
upsertMetadataManager.close();
}
+ @Test
+ public void testAddRecordWithDeleteColumn()
+ throws IOException {
+ verifyAddRecordWithDeleteColumn(HashFunction.NONE);
+ verifyAddRecordWithDeleteColumn(HashFunction.MD5);
+ verifyAddRecordWithDeleteColumn(HashFunction.MURMUR3);
+ }
+ private void verifyAddRecordWithDeleteColumn(HashFunction hashFunction)
+ throws IOException {
+ String comparisonColumn = "timeCol";
+ String deleteColumn = "deleteCol";
+ ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+ new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
+ Collections.singletonList(comparisonColumn), deleteColumn, hashFunction, null,
+ false, mock(ServerMetrics.class));
+ Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+ // queryableDocIds is same as validDocIds in the absence of delete markers
+ // Add the first segment
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ int numRecords = 3;
+ int[] primaryKeys = new int[]{0, 1, 2};
+ int[] timestamps = new int[]{100, 120, 100};
+ ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds1 = new ThreadSafeMutableRoaringBitmap();
+ ImmutableSegmentImpl segment1 =
+ mockImmutableSegment(1, validDocIds1, queryableDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
+ upsertMetadataManager.addSegment(segment1, validDocIds1, queryableDocIds1,
+ getRecordInfoList(numRecords, primaryKeys, timestamps, null).iterator());
+
+ // Update records from the second segment
+ ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ ThreadSafeMutableRoaringBitmap queryableDocIds2 = new ThreadSafeMutableRoaringBitmap();
+ MutableSegment segment2 = mockMutableSegment(1, validDocIds2, queryableDocIds2);
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100), false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
+ // segment2: 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+ // Mark a record with latest value in segment1 as deleted
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120), true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {0, 100}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});
+
+ // Mark a record with latest value in segment2 as deleted
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 2, new IntWrapper(150), true));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {2, 150}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 2, 150, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 2});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{});
+
+ // Revive a deleted primary key (by providing a larger comparisonValue)
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 3, new IntWrapper(200), false));
+
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3});
+
+ // Stop the metadata manager
+ upsertMetadataManager.stop();
+
+ // Add record should be no-op
+ upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(0), 4, new IntWrapper(120), false));
+ // segment1: 0 -> {0, 100}, 1 -> {1, 120}
+ // segment2: 2 -> {1, 120}, 3 -> {3, 200}
+ checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
+ checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
+ checkRecordLocation(recordLocationMap, 3, segment2, 3, 200, hashFunction);
+ assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 3});
+ assertEquals(queryableDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
+ assertEquals(queryableDocIds2.getMutableRoaringBitmap().toArray(), new int[]{3});
+
+ // Close the metadata manager
+ upsertMetadataManager.close();
+ }
+
@Test
public void testHashPrimaryKey() {
PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 138ae2db1a..a848fd1247 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -1415,7 +1415,8 @@ public class TableConfigUtilsTest {
@Test
public void testValidateUpsertConfig() {
Schema schema =
- new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
.build();
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
TableConfig tableConfig =
@@ -1521,6 +1522,43 @@ public class TableConfigUtilsTest {
Assert.assertEquals(e.getMessage(),
"Metrics aggregation cannot be enabled in the Indexing Config and Ingestion Config at the same time");
}
+
+ // Table upsert with delete column
+ String incorrectTypeDelCol = "incorrectTypeDeleteCol";
+ String delCol = "myDelCol";
+ schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .setPrimaryKeyColumns(Lists.newArrayList("myPkCol"))
+ .addSingleValueDimension("myCol", FieldSpec.DataType.STRING)
+ .addSingleValueDimension(incorrectTypeDelCol, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(delCol, FieldSpec.DataType.BOOLEAN)
+ .build();
+ streamConfigs = getStreamConfigs();
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(incorrectTypeDelCol);
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ Assert.fail("Invalid delete column type (string) should have failed table creation");
+ } catch (IllegalStateException e) {
+ Assert.assertEquals(e.getMessage(), "The delete record column must be a single-valued BOOLEAN column");
+ }
+
+ upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
+ upsertConfig.setDeleteRecordColumn(delCol);
+ tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setStreamConfigs(streamConfigs)
+ .setUpsertConfig(upsertConfig)
+ .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+ .build();
+ try {
+ TableConfigUtils.validateUpsertAndDedupConfig(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.fail("Shouldn't fail table creation when delete column type is boolean.");
+ }
}
@Test
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
index de200e6be9..9c48a00efc 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java
@@ -72,10 +72,21 @@ public interface IndexSegment {
*/
List<StarTreeV2> getStarTrees();
+ /**
+ * Returns a bitmap of the valid document ids. Valid document is the document that holds the latest timestamp (or
+ * largest comparison value) for the primary key for upsert enabled tables.
+ */
// TODO(upsert): solve the coordination problems of getting validDoc across segments for result consistency
@Nullable
ThreadSafeMutableRoaringBitmap getValidDocIds();
+ /**
+ * Returns a bitmap of the queryable document ids. Queryable document is the document that holds the latest timestamp
+ * (or largest comparison value) for the primary key and is not deleted for upsert enabled tables.
+ */
+ @Nullable
+ ThreadSafeMutableRoaringBitmap getQueryableDocIds();
+
/**
* Returns the record for the given document id. Virtual column values are not returned.
* <p>NOTE: don't use this method for high performance code. Use PinotSegmentRecordReader when reading multiple
diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 94983e6267..bb912dd9e0 100644
--- a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++ b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -293,7 +293,7 @@ public class TablesResourceTest extends BaseResourceTest {
for (int docId: docIds) {
validDocIds.add(docId);
}
- segment.enableUpsert(upsertMetadataManager, validDocIds);
+ segment.enableUpsert(upsertMetadataManager, validDocIds, null);
// Download the snapshot in byte[] format.
Response response = _webTarget.path(snapshotPath).request().get(Response.class);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
index 41a5bfeb71..f16d2e67f9 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/TableConfig.java
@@ -388,6 +388,12 @@ public class TableConfig extends BaseJsonConfig {
return _upsertConfig == null ? null : _upsertConfig.getComparisonColumns();
}
+ @JsonIgnore
+ @Nullable
+ public String getUpsertDeleteRecordColumn() {
+ return _upsertConfig == null ? null : _upsertConfig.getDeleteRecordColumn();
+ }
+
@JsonProperty(TUNER_CONFIG_LIST_KEY)
public List<TunerConfig> getTunerConfigsList() {
return _tunerConfigList;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
index f4ce363d00..c589b73a01 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java
@@ -54,6 +54,9 @@ public class UpsertConfig extends BaseJsonConfig {
@JsonPropertyDescription("Columns for upsert comparison, default to time column")
private List<String> _comparisonColumns;
+ @JsonPropertyDescription("Boolean column to indicate whether a records should be deleted")
+ private String _deleteRecordColumn;
+
@JsonPropertyDescription("Whether to use snapshot for fast upsert metadata recovery")
private boolean _enableSnapshot;
@@ -96,6 +99,11 @@ public class UpsertConfig extends BaseJsonConfig {
return _comparisonColumns;
}
+ @Nullable
+ public String getDeleteRecordColumn() {
+ return _deleteRecordColumn;
+ }
+
public boolean isEnableSnapshot() {
return _enableSnapshot;
}
@@ -154,6 +162,12 @@ public class UpsertConfig extends BaseJsonConfig {
}
}
+ public void setDeleteRecordColumn(String deleteRecordColumn) {
+ if (deleteRecordColumn != null) {
+ _deleteRecordColumn = deleteRecordColumn;
+ }
+ }
+
public void setEnableSnapshot(boolean enableSnapshot) {
_enableSnapshot = enableSnapshot;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org