You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/02/01 07:53:24 UTC

[pinot] branch master updated: Wire EmptySegmentPruner to routing config (#8067)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 71e28a2  Wire EmptySegmentPruner to routing config (#8067)
71e28a2 is described below

commit 71e28a2313a0e175e64398b195e488b0fd67d49b
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Mon Jan 31 23:53:10 2022 -0800

    Wire EmptySegmentPruner to routing config (#8067)
    
    * Remove EmptySegmentPruner from the default path and wire it to the config
    
    * fix unit test
    
    * rename isKinesisEnabled as needsEmptySegmentPruner and move to TableConfigUtils
    
    * add pinot-kinesis dependency to pinot-segment-local
    
    * hardcode kinesis in TableConfigUtils to avoid pulling pinot-kinesis module as dependency
    
    * Minor change on the comments
---
 .../segmentpruner/SegmentPrunerFactory.java        | 28 +++++--
 .../routing/segmentpruner/SegmentPrunerTest.java   | 85 ++++++++++++++--------
 .../segment/local/utils/TableConfigUtils.java      | 57 ++++++++++++++-
 .../pinot/spi/config/table/RoutingConfig.java      |  1 +
 4 files changed, 131 insertions(+), 40 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 9837db3..b5adeba 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -45,11 +46,14 @@ public class SegmentPrunerFactory {
 
   public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore) {
-    RoutingConfig routingConfig = tableConfig.getRoutingConfig();
     List<SegmentPruner> segmentPruners = new ArrayList<>();
-    // Always prune out empty segments first
-    segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+    boolean needsEmptySegment = TableConfigUtils.needsEmptySegmentPruner(tableConfig);
+    if (needsEmptySegment) {
+      // Add EmptySegmentPruner if needed
+      segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+    }
 
+    RoutingConfig routingConfig = tableConfig.getRoutingConfig();
     if (routingConfig != null) {
       List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes();
       if (segmentPrunerTypes != null) {
@@ -61,7 +65,6 @@ public class SegmentPrunerFactory {
               configuredSegmentPruners.add(partitionSegmentPruner);
             }
           }
-
           if (RoutingConfig.TIME_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
             TimeSegmentPruner timeSegmentPruner = getTimeSegmentPruner(tableConfig, propertyStore);
             if (timeSegmentPruner != null) {
@@ -69,13 +72,16 @@ public class SegmentPrunerFactory {
             }
           }
         }
+        // Sort all segment pruners in order of: empty -> time -> partition. We are trying to sort them in a this order
+        // for improving the performance, this order may not be the optimal case -- we need move the pruner that will
+        // potentially prune the most segments to front)
         segmentPruners.addAll(sortSegmentPruners(configuredSegmentPruners));
       } else {
         // Handle legacy configs for backward-compatibility
         TableType tableType = tableConfig.getTableType();
         String routingTableBuilderName = routingConfig.getRoutingTableBuilderName();
-        if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING
-            .equalsIgnoreCase(routingTableBuilderName)) || (tableType == TableType.REALTIME
+        if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase(
+            routingTableBuilderName)) || (tableType == TableType.REALTIME
             && LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) {
           PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
           if (partitionSegmentPruner != null) {
@@ -129,17 +135,23 @@ public class SegmentPrunerFactory {
   }
 
   private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> pruners) {
-    // If there's multiple pruners, move time range pruners to the front。
+    // If there's multiple pruners, always prune empty segments first. After that, pruned based on time range, and
+    // followed by partition pruners.
     // Partition pruner run time is proportional to input # of segments while time range pruner is not,
     // Prune based on time range first will have a smaller input size for partition pruners, so have better performance.
     List<SegmentPruner> sortedPruners = new ArrayList<>();
     for (SegmentPruner pruner : pruners) {
+      if (pruner instanceof EmptySegmentPruner) {
+        sortedPruners.add(pruner);
+      }
+    }
+    for (SegmentPruner pruner : pruners) {
       if (pruner instanceof TimeSegmentPruner) {
         sortedPruners.add(pruner);
       }
     }
     for (SegmentPruner pruner : pruners) {
-      if (!(pruner instanceof TimeSegmentPruner)) {
+      if (pruner instanceof PartitionSegmentPruner) {
         sortedPruners.add(pruner);
       }
     }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index c09705d..f06798f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -48,9 +48,11 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -95,6 +97,10 @@ public class SegmentPrunerTest extends ControllerTest {
   private static final String SDF_QUERY_5 =
       "SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
 
+  // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
+  // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
+  private static final String KINESIS_STREAM_TYPE = "kinesis";
+
   private ZkClient _zkClient;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
 
@@ -121,63 +127,53 @@ public class SegmentPrunerTest extends ControllerTest {
 
     // Routing config is missing
     List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Segment pruner type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Segment partition config is missing
     when(routingConfig.getSegmentPrunerTypes()).thenReturn(
         Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Column partition config is missing
     Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
     when(indexingConfig.getSegmentPartitionConfig()).thenReturn(new SegmentPartitionConfig(columnPartitionConfigMap));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Partition-aware segment pruner should be returned
     columnPartitionConfigMap.put(PARTITION_COLUMN, new ColumnPartitionConfig("Modulo", 5));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 2);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
-    assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
 
     // Do not allow multiple partition columns
     columnPartitionConfigMap.put("anotherPartitionColumn", new ColumnPartitionConfig("Modulo", 5));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Should be backward-compatible with legacy config
     columnPartitionConfigMap.remove("anotherPartitionColumn");
     when(routingConfig.getSegmentPrunerTypes()).thenReturn(null);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         SegmentPrunerFactory.LEGACY_PARTITION_AWARE_OFFLINE_ROUTING);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 2);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
-    assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
+    assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
     when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
     when(routingConfig.getRoutingTableBuilderName()).thenReturn(
         SegmentPrunerFactory.LEGACY_PARTITION_AWARE_REALTIME_ROUTING);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 2);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
-    assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
   }
 
   @Test
@@ -188,36 +184,63 @@ public class SegmentPrunerTest extends ControllerTest {
 
     // Routing config is missing
     List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Segment pruner type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Validation config is missing
     when(routingConfig.getSegmentPrunerTypes()).thenReturn(
         Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Time column is missing
     SegmentsValidationAndRetentionConfig validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     when(tableConfig.getValidationConfig()).thenReturn(validationConfig);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertEquals(segmentPruners.size(), 0);
 
     // Time range pruner should be returned
     when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 2);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
+  }
+
+  @Test
+  public void testEnablingEmptySegmentPruner() {
+    TableConfig tableConfig = mock(TableConfig.class);
+    IndexingConfig indexingConfig = mock(IndexingConfig.class);
+    RoutingConfig routingConfig = mock(RoutingConfig.class);
+    StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class);
+
+    // When routingConfig is configured with EmptySegmentPruner, EmptySegmentPruner should be returned.
+    when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
+    when(routingConfig.getSegmentPrunerTypes()).thenReturn(
+        Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
+    List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+
+    // When indexingConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
+    when(indexingConfig.getStreamConfigs()).thenReturn(
+        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+
+    // When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
+    when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
+        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
+    when(indexingConfig.getStreamConfigs()).thenReturn(
+        Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
     assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
-    assertTrue(segmentPruners.get(1) instanceof TimeSegmentPruner);
   }
 
   @DataProvider
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 60f6bda..d5cc083 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -53,12 +53,14 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.ingestion.batch.BatchConfig;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.DataSizeUtils;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -83,6 +85,10 @@ public final class TableConfigUtils {
   // supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}.
   private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask";
 
+  // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
+  // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
+  private static final String KINESIS_STREAM_TYPE = "kinesis";
+
   /**
    * @see TableConfigUtils#validate(TableConfig, Schema, String)
    */
@@ -107,7 +113,7 @@ public final class TableConfigUtils {
     }
     // Sanitize the table config before validation
     sanitize(tableConfig);
-    // skip all validation if skip type ALL is selected. 
+    // skip all validation if skip type ALL is selected.
     if (!skipTypes.contains(ValidationType.ALL)) {
       validateValidationConfig(tableConfig, schema);
       validateIngestionConfig(tableConfig, schema);
@@ -857,4 +863,53 @@ public final class TableConfigUtils {
   public enum ValidationType {
     ALL, TASK, UPSERT
   }
+
+  /**
+   * needsEmptySegmentPruner checks if EmptySegmentPruner is needed for a TableConfig.
+   * @param tableConfig Input table config.
+   */
+  public static boolean needsEmptySegmentPruner(TableConfig tableConfig) {
+    if (isKinesisConfigured(tableConfig)) {
+      return true;
+    }
+    RoutingConfig routingConfig = tableConfig.getRoutingConfig();
+    if (routingConfig == null) {
+      return false;
+    }
+    List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes();
+    if (segmentPrunerTypes == null || segmentPrunerTypes.isEmpty()) {
+      return false;
+    }
+    for (String segmentPrunerType : segmentPrunerTypes) {
+      if (RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  private static boolean isKinesisConfigured(TableConfig tableConfig) {
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    if (indexingConfig != null) {
+      Map<String, String> streamConfig = indexingConfig.getStreamConfigs();
+      if (streamConfig != null && KINESIS_STREAM_TYPE.equals(
+          streamConfig.get(StreamConfigProperties.STREAM_TYPE))) {
+        return true;
+      }
+    }
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    if (ingestionConfig == null) {
+      return false;
+    }
+    StreamIngestionConfig streamIngestionConfig = ingestionConfig.getStreamIngestionConfig();
+    if (streamIngestionConfig == null) {
+      return false;
+    }
+    for (Map<String, String> config : streamIngestionConfig.getStreamConfigMaps()) {
+      if (config != null && KINESIS_STREAM_TYPE.equals(config.get(StreamConfigProperties.STREAM_TYPE))) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 4a9127f..2c238aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -28,6 +28,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
 public class RoutingConfig extends BaseJsonConfig {
   public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
   public static final String TIME_SEGMENT_PRUNER_TYPE = "time";
+  public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
   public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "replicaGroup";
   public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "strictReplicaGroup";
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org