You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/02/01 07:53:24 UTC
[pinot] branch master updated: Wire EmptySegmentPruner to routing config (#8067)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 71e28a2 Wire EmptySegmentPruner to routing config (#8067)
71e28a2 is described below
commit 71e28a2313a0e175e64398b195e488b0fd67d49b
Author: Liang Mingqiang <mi...@linkedin.com>
AuthorDate: Mon Jan 31 23:53:10 2022 -0800
Wire EmptySegmentPruner to routing config (#8067)
* Remove EmptySegmentPruner from the default path and wire it to the config
* fix unit test
* rename isKinesisEnabled as needsEmptySegmentPruner and move to TableConfigUtils
* add pinot-kinesis dependency to pinot-segment-local
* hardcode kinesis in TableConfigUtils to avoid pulling pinot-kinesis module as dependency
* Minor change on the comments
---
.../segmentpruner/SegmentPrunerFactory.java | 28 +++++--
.../routing/segmentpruner/SegmentPrunerTest.java | 85 ++++++++++++++--------
.../segment/local/utils/TableConfigUtils.java | 57 ++++++++++++++-
.../pinot/spi/config/table/RoutingConfig.java | 1 +
4 files changed, 131 insertions(+), 40 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 9837db3..b5adeba 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.segment.local.utils.TableConfigUtils;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
@@ -45,11 +46,14 @@ public class SegmentPrunerFactory {
public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
- RoutingConfig routingConfig = tableConfig.getRoutingConfig();
List<SegmentPruner> segmentPruners = new ArrayList<>();
- // Always prune out empty segments first
- segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+ boolean needsEmptySegment = TableConfigUtils.needsEmptySegmentPruner(tableConfig);
+ if (needsEmptySegment) {
+ // Add EmptySegmentPruner if needed
+ segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+ }
+ RoutingConfig routingConfig = tableConfig.getRoutingConfig();
if (routingConfig != null) {
List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes();
if (segmentPrunerTypes != null) {
@@ -61,7 +65,6 @@ public class SegmentPrunerFactory {
configuredSegmentPruners.add(partitionSegmentPruner);
}
}
-
if (RoutingConfig.TIME_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
TimeSegmentPruner timeSegmentPruner = getTimeSegmentPruner(tableConfig, propertyStore);
if (timeSegmentPruner != null) {
@@ -69,13 +72,16 @@ public class SegmentPrunerFactory {
}
}
}
+ // Sort all segment pruners in order of: empty -> time -> partition. We are trying to sort them in a this order
+ // for improving the performance, this order may not be the optimal case -- we need move the pruner that will
+ // potentially prune the most segments to front)
segmentPruners.addAll(sortSegmentPruners(configuredSegmentPruners));
} else {
// Handle legacy configs for backward-compatibility
TableType tableType = tableConfig.getTableType();
String routingTableBuilderName = routingConfig.getRoutingTableBuilderName();
- if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING
- .equalsIgnoreCase(routingTableBuilderName)) || (tableType == TableType.REALTIME
+ if ((tableType == TableType.OFFLINE && LEGACY_PARTITION_AWARE_OFFLINE_ROUTING.equalsIgnoreCase(
+ routingTableBuilderName)) || (tableType == TableType.REALTIME
&& LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) {
PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
if (partitionSegmentPruner != null) {
@@ -129,17 +135,23 @@ public class SegmentPrunerFactory {
}
private static List<SegmentPruner> sortSegmentPruners(List<SegmentPruner> pruners) {
- // If there's multiple pruners, move time range pruners to the front。
+ // If there's multiple pruners, always prune empty segments first. After that, pruned based on time range, and
+ // followed by partition pruners.
// Partition pruner run time is proportional to input # of segments while time range pruner is not,
// Prune based on time range first will have a smaller input size for partition pruners, so have better performance.
List<SegmentPruner> sortedPruners = new ArrayList<>();
for (SegmentPruner pruner : pruners) {
+ if (pruner instanceof EmptySegmentPruner) {
+ sortedPruners.add(pruner);
+ }
+ }
+ for (SegmentPruner pruner : pruners) {
if (pruner instanceof TimeSegmentPruner) {
sortedPruners.add(pruner);
}
}
for (SegmentPruner pruner : pruners) {
- if (!(pruner instanceof TimeSegmentPruner)) {
+ if (pruner instanceof PartitionSegmentPruner) {
sortedPruners.add(pruner);
}
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index c09705d..f06798f 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -48,9 +48,11 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
@@ -95,6 +97,10 @@ public class SegmentPrunerTest extends ControllerTest {
private static final String SDF_QUERY_5 =
"SELECT * FROM testTable where timeColumn in (20200101, 20200102) AND timeColumn >= 20200530";
+ // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
+ // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
+ private static final String KINESIS_STREAM_TYPE = "kinesis";
+
private ZkClient _zkClient;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
@@ -121,63 +127,53 @@ public class SegmentPrunerTest extends ControllerTest {
// Routing config is missing
List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Segment pruner type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Segment partition config is missing
when(routingConfig.getSegmentPrunerTypes()).thenReturn(
Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Column partition config is missing
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
when(indexingConfig.getSegmentPartitionConfig()).thenReturn(new SegmentPartitionConfig(columnPartitionConfigMap));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Partition-aware segment pruner should be returned
columnPartitionConfigMap.put(PARTITION_COLUMN, new ColumnPartitionConfig("Modulo", 5));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 2);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
- assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
// Do not allow multiple partition columns
columnPartitionConfigMap.put("anotherPartitionColumn", new ColumnPartitionConfig("Modulo", 5));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Should be backward-compatible with legacy config
columnPartitionConfigMap.remove("anotherPartitionColumn");
when(routingConfig.getSegmentPrunerTypes()).thenReturn(null);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
SegmentPrunerFactory.LEGACY_PARTITION_AWARE_OFFLINE_ROUTING);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 2);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
- assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
+ assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
when(routingConfig.getRoutingTableBuilderName()).thenReturn(
SegmentPrunerFactory.LEGACY_PARTITION_AWARE_REALTIME_ROUTING);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 2);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
- assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
}
@Test
@@ -188,36 +184,63 @@ public class SegmentPrunerTest extends ControllerTest {
// Routing config is missing
List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Segment pruner type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Validation config is missing
when(routingConfig.getSegmentPrunerTypes()).thenReturn(
Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Time column is missing
SegmentsValidationAndRetentionConfig validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
when(tableConfig.getValidationConfig()).thenReturn(validationConfig);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertEquals(segmentPruners.size(), 0);
// Time range pruner should be returned
when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 2);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
+ }
+
+ @Test
+ public void testEnablingEmptySegmentPruner() {
+ TableConfig tableConfig = mock(TableConfig.class);
+ IndexingConfig indexingConfig = mock(IndexingConfig.class);
+ RoutingConfig routingConfig = mock(RoutingConfig.class);
+ StreamIngestionConfig streamIngestionConfig = mock(StreamIngestionConfig.class);
+
+ // When routingConfig is configured with EmptySegmentPruner, EmptySegmentPruner should be returned.
+ when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
+ when(routingConfig.getSegmentPrunerTypes()).thenReturn(
+ Collections.singletonList(RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE));
+ List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+
+ // When indexingConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
+ when(indexingConfig.getStreamConfigs()).thenReturn(
+ Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+
+ // When streamIngestionConfig is configured with Kinesis streaming, EmptySegmentPruner should be returned.
+ when(streamIngestionConfig.getStreamConfigMaps()).thenReturn(Collections.singletonList(
+ Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE)));
+ when(indexingConfig.getStreamConfigs()).thenReturn(
+ Collections.singletonMap(StreamConfigProperties.STREAM_TYPE, KINESIS_STREAM_TYPE));
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
- assertTrue(segmentPruners.get(1) instanceof TimeSegmentPruner);
}
@DataProvider
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 60f6bda..d5cc083 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -53,12 +53,14 @@ import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfig;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -83,6 +85,10 @@ public final class TableConfigUtils {
// supported TableTaskTypes, must be identical to the one return in the impl of {@link PinotTaskGenerator}.
private static final String REALTIME_TO_OFFLINE_TASK_TYPE = "RealtimeToOfflineSegmentsTask";
+ // this is duplicate with KinesisConfig.STREAM_TYPE, while instead of use KinesisConfig.STREAM_TYPE directly, we
+ // hardcode the value here to avoid pulling the entire pinot-kinesis module as dependency.
+ private static final String KINESIS_STREAM_TYPE = "kinesis";
+
/**
* @see TableConfigUtils#validate(TableConfig, Schema, String)
*/
@@ -107,7 +113,7 @@ public final class TableConfigUtils {
}
// Sanitize the table config before validation
sanitize(tableConfig);
- // skip all validation if skip type ALL is selected.
+ // skip all validation if skip type ALL is selected.
if (!skipTypes.contains(ValidationType.ALL)) {
validateValidationConfig(tableConfig, schema);
validateIngestionConfig(tableConfig, schema);
@@ -857,4 +863,53 @@ public final class TableConfigUtils {
public enum ValidationType {
ALL, TASK, UPSERT
}
+
+ /**
+ * needsEmptySegmentPruner checks if EmptySegmentPruner is needed for a TableConfig.
+ * @param tableConfig Input table config.
+ */
+ public static boolean needsEmptySegmentPruner(TableConfig tableConfig) {
+ if (isKinesisConfigured(tableConfig)) {
+ return true;
+ }
+ RoutingConfig routingConfig = tableConfig.getRoutingConfig();
+ if (routingConfig == null) {
+ return false;
+ }
+ List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes();
+ if (segmentPrunerTypes == null || segmentPrunerTypes.isEmpty()) {
+ return false;
+ }
+ for (String segmentPrunerType : segmentPrunerTypes) {
+ if (RoutingConfig.EMPTY_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static boolean isKinesisConfigured(TableConfig tableConfig) {
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ if (indexingConfig != null) {
+ Map<String, String> streamConfig = indexingConfig.getStreamConfigs();
+ if (streamConfig != null && KINESIS_STREAM_TYPE.equals(
+ streamConfig.get(StreamConfigProperties.STREAM_TYPE))) {
+ return true;
+ }
+ }
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ if (ingestionConfig == null) {
+ return false;
+ }
+ StreamIngestionConfig streamIngestionConfig = ingestionConfig.getStreamIngestionConfig();
+ if (streamIngestionConfig == null) {
+ return false;
+ }
+ for (Map<String, String> config : streamIngestionConfig.getStreamConfigMaps()) {
+ if (config != null && KINESIS_STREAM_TYPE.equals(config.get(StreamConfigProperties.STREAM_TYPE))) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index 4a9127f..2c238aa 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -28,6 +28,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
public class RoutingConfig extends BaseJsonConfig {
public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
public static final String TIME_SEGMENT_PRUNER_TYPE = "time";
+ public static final String EMPTY_SEGMENT_PRUNER_TYPE = "empty";
public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "replicaGroup";
public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "strictReplicaGroup";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org