You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/18 19:35:24 UTC

(pinot) branch master updated: Add a check to enable size based threshold for realtime tables (#12016)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6aecd41641 Add a check to enable size based threshold for realtime tables (#12016)
6aecd41641 is described below

commit 6aecd4164189d9840011f4cf6344b4e5d44a3c00
Author: soumitra-st <12...@users.noreply.github.com>
AuthorDate: Sat Nov 18 11:35:17 2023 -0800

    Add a check to enable size based threshold for realtime tables (#12016)
---
 .../segment/local/utils/TableConfigUtils.java      |  7 ++++
 .../segment/local/utils/TableConfigUtilsTest.java  | 49 ++++++++++++++++++++++
 2 files changed, 56 insertions(+)

diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 7605925432..6290f5f913 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -160,6 +160,13 @@ public final class TableConfigUtils {
           throw new IllegalStateException("Could not create StreamConfig using the streamConfig map", e);
         }
         validateDecoder(streamConfig);
+        // if segmentSizeBytes is specified, rows must be zero.
+        if (streamConfigMap.containsKey(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE)
+            || streamConfigMap.containsKey(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE)) {
+          Preconditions.checkState(streamConfig.getFlushThresholdRows() == 0,
+              String.format("Invalid config: %s=%d, it must be set to 0 for size based segment threshold to work.",
+                  StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, streamConfig.getFlushThresholdRows()));
+        }
       }
       validateTierConfigList(tableConfig.getTierConfigsList());
       validateIndexingConfig(tableConfig.getIndexingConfig(), schema);
diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 4f691188f6..1b68bb8a8f 100644
--- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -60,6 +60,7 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.mockito.Mockito;
@@ -694,6 +695,54 @@ public class TableConfigUtilsTest {
     } catch (IllegalStateException e) {
       // expected
     }
+
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+
+    // When size based threshold is specified, default rows does not work, it has to be explicitly set to 0.
+    streamConfigs = getKafkaStreamConfigs();
+    streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE);
+    streamConfigs.remove(StreamConfigProperties.DEPRECATED_SEGMENT_FLUSH_DESIRED_SIZE);
+    streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, "100m");
+    streamConfigs.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
+    tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("must be set to 0"));
+    }
+
+    // When size based threshold is specified, rows has to be set to 0.
+    streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "1000");
+    ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
+    tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+      Assert.fail();
+    } catch (IllegalStateException e) {
+      Assert.assertTrue(e.getMessage().contains("must be set to 0"));
+    }
+
+    // When size based threshold is specified, rows has to be set to 0.
+    streamConfigs.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, "0");
+    ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(streamConfigs)));
+    tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
+
+    try {
+      TableConfigUtils.validate(tableConfig, schema);
+    } catch (IllegalStateException e) {
+      Assert.fail(e.getMessage());
+    }
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org