You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/18 19:35:24 UTC
(pinot) branch master updated: Add a check to enable size based threshold for realtime tables (#12016)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6aecd41641 Add a check to enable size based threshold for realtime tables (#12016)
6aecd41641 is described below
commit 6aecd4164189d9840011f4cf6344b4e5d44a3c00
Author: soumitra-st <12...@users.noreply.github.com>
AuthorDate: Sat Nov 18 11:35:17 2023 -0800
Add a check to enable size based threshold for realtime tables (#12016)
---
.../segment/local/utils/TableConfigUtils.java | 7 ++++
.../segment/local/utils/TableConfigUtilsTest.java | 49 ++++++++++++++++++++++
2 files changed, 56 insertions(+)
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 7605925432..6290f5f913 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -160,6 +160,13 @@ public final class TableConfigUtils {
throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e);
}
validateDecoder(streamConfig);
+ // if segmentSizeBytes is specified, rows must be zero.
+ if (streamConfigMap.containsKey(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE)
+ || streamConfigMap.containsKey(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE)) {
+ Preconditions.checkState(streamConfig.getFlushThresholdRows() == 0,
+ String.format("Invalid config: %s=%d, it must be set to 0 for size based segment threshold to work.",
+ StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, streamConfig.getFlushThresholdRows()));
+ }
}
validateTierConfigList(tableConfig.getTierConfigsList());
validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 4f691188f6..1b68bb8a8f 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -60,6 +60,7 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
@@ -694,6 +695,54 @@ public class TableConfigUtilsTest {
} catch (IllegalStateException e) {
// expected
}
+
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+ // When size based threshold is specified, default rows does not work, it has to be explicitly set to 0.
+ streamConfigs = getKafkaStreamConfigs();
+ streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE);
+ streamConfigs.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
+ streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m");
+ streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
+ tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+ .setIngestionConfig(ingestionConfig).build();
+
+ try {
+ TableConfigUtils.validate(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("must be set to 0"));
+ }
+
+ // When size based threshold is specified, rows has to be set to 0.
+ streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000");
+ ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
+ tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+ .setIngestionConfig(ingestionConfig).build();
+
+ try {
+ TableConfigUtils.validate(tableConfig, schema);
+ Assert.fail();
+ } catch (IllegalStateException e) {
+ Assert.assertTrue(e.getMessage().contains("must be set to 0"));
+ }
+
+ // When size based threshold is specified, rows has to be set to 0.
+ streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0");
+ ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
+ tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+ .setIngestionConfig(ingestionConfig).build();
+
+ try {
+ TableConfigUtils.validate(tableConfig, schema);
+ } catch (IllegalStateException e) {
+ Assert.fail(e.getMessage());
+ }
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org