You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/17 20:51:14 UTC
[incubator-pinot] branch master updated: Handle creation of
segments with 0 rows (#6466)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 60b0c5f Handle creation of segments with 0 rows (#6466)
60b0c5f is described below
commit 60b0c5f2dce5e82c7b1013a3b8d8935774d12d72
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Feb 17 12:50:58 2021 -0800
Handle creation of segments with 0 rows (#6466)
* Handle creation of segments with 0 rows
* Broker and server side pruners for empty segment
* Set current time as start/end if 0 totalDocs. Handle in time boundary
* EmptyIndexSegment and EmptyDataSource
* Make ValidSegmentPruner as first pruner
---
.../routing/segmentpruner/EmptySegmentPruner.java | 130 ++++++++++++++
.../segmentpruner/SegmentPrunerFactory.java | 16 +-
.../routing/timeboundary/TimeBoundaryManager.java | 19 +-
.../routing/segmentpruner/SegmentPrunerTest.java | 195 ++++++++++++++++++---
.../timeboundary/TimeBoundaryManagerTest.java | 19 ++
.../realtime/PinotLLCRealtimeSegmentManager.java | 15 +-
.../controller/utils/SegmentMetadataMockUtils.java | 6 +-
.../data/readers/PinotSegmentRecordReader.java | 54 +++---
.../indexsegment/immutable/EmptyIndexSegment.java | 120 +++++++++++++
.../immutable/ImmutableSegmentLoader.java | 3 +
.../core/query/pruner/SegmentPrunerService.java | 17 +-
.../converter/RealtimeSegmentRecordReader.java | 3 +-
.../creator/impl/SegmentColumnarIndexCreator.java | 53 ++++--
.../impl/SegmentIndexCreationDriverImpl.java | 15 +-
.../segment/index/datasource/EmptyDataSource.java | 94 ++++++++++
.../data/readers/RecordReaderSampleDataTest.java | 12 ++
.../SegmentGenerationWithNoRecordsTest.java | 117 +++++++++++++
.../pinot/query/executor/QueryExecutorTest.java | 27 ++-
.../converter/RealtimeSegmentConverterTest.java | 103 ++++++++++-
.../segments/v1/creator/SegmentTestUtils.java | 10 ++
.../src/test/resources/data/test_empty_data.json | 0
21 files changed, 930 insertions(+), 98 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
new file mode 100644
index 0000000..8e45331
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentpruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code EmptySegmentPruner} prunes segments if they have 0 totalDocs.
+ * Does not prune segments with -1 total docs (that can be either error case or CONSUMING segment)
+ */
+public class EmptySegmentPruner implements SegmentPruner {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EmptySegmentPruner.class);
+
+ private final String _tableNameWithType;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ private final String _segmentZKMetadataPathPrefix;
+
+ private final Map<String, Long> _segmentTotalDocsMap = new HashMap<>();
+ private final Set<String> _emptySegments = new HashSet<>();
+
+ public EmptySegmentPruner(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ _tableNameWithType = tableConfig.getTableName();
+ _propertyStore = propertyStore;
+ _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + "/";
+ }
+
+ @Override
+ public void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
+ // Bulk load info for all online segments
+ int numSegments = onlineSegments.size();
+ List<String> segments = new ArrayList<>(numSegments);
+ List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
+ for (String segment : onlineSegments) {
+ segments.add(segment);
+ segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
+ for (int i = 0; i < numSegments; i++) {
+ String segment = segments.get(i);
+ long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment, znRecords.get(i));
+ _segmentTotalDocsMap.put(segment, totalDocs);
+ if (totalDocs == 0) {
+ _emptySegments.add(segment);
+ }
+ }
+ }
+
+ private long extractTotalDocsFromSegmentZKMetaZNRecord(String segment, @Nullable ZNRecord znRecord) {
+ if (znRecord == null) {
+ LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType);
+ return -1;
+ }
+ return znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
+ }
+
+ @Override
+ public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
+ Set<String> onlineSegments) {
+ // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
+ // ones. The refreshed segment ZK metadata change won't be picked up.
+ for (String segment : onlineSegments) {
+ _segmentTotalDocsMap.computeIfAbsent(segment, k -> {
+ long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(k,
+ _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT));
+ if (totalDocs == 0) {
+ _emptySegments.add(segment);
+ }
+ return totalDocs;
+ });
+ }
+ _segmentTotalDocsMap.keySet().retainAll(onlineSegments);
+ _emptySegments.retainAll(onlineSegments);
+ }
+
+ @Override
+ public synchronized void refreshSegment(String segment) {
+ long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment,
+ _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT));
+ _segmentTotalDocsMap.put(segment, totalDocs);
+ if (totalDocs == 0) {
+ _emptySegments.add(segment);
+ } else {
+ _emptySegments.remove(segment);
+ }
+ }
+
+ /**
+ * Prune out segments which are empty
+ */
+ @Override
+ public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
+ Set<String> selectedSegments = new HashSet<>(segments);
+ selectedSegments.removeAll(_emptySegments);
+ return selectedSegments;
+ }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 41137e9..2a61927 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -47,26 +47,30 @@ public class SegmentPrunerFactory {
public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
RoutingConfig routingConfig = tableConfig.getRoutingConfig();
+ List<SegmentPruner> segmentPruners = new ArrayList<>();
+ // Always prune out empty segments first
+ segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+
if (routingConfig != null) {
List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes();
if (segmentPrunerTypes != null) {
- List<SegmentPruner> segmentPruners = new ArrayList<>(segmentPrunerTypes.size());
+ List<SegmentPruner> configuredSegmentPruners = new ArrayList<>(segmentPrunerTypes.size());
for (String segmentPrunerType : segmentPrunerTypes) {
if (RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
if (partitionSegmentPruner != null) {
- segmentPruners.add(partitionSegmentPruner);
+ configuredSegmentPruners.add(partitionSegmentPruner);
}
}
if (RoutingConfig.TIME_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
TimeSegmentPruner timeSegmentPruner = getTimeSegmentPruner(tableConfig, propertyStore);
if (timeSegmentPruner != null) {
- segmentPruners.add(timeSegmentPruner);
+ configuredSegmentPruners.add(timeSegmentPruner);
}
}
}
- return sortSegmentPruners(segmentPruners);
+ segmentPruners.addAll(sortSegmentPruners(configuredSegmentPruners));
} else {
// Handle legacy configs for backward-compatibility
TableType tableType = tableConfig.getTableType();
@@ -76,12 +80,12 @@ public class SegmentPrunerFactory {
&& LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) {
PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
if (partitionSegmentPruner != null) {
- return Collections.singletonList(getPartitionSegmentPruner(tableConfig, propertyStore));
+ segmentPruners.add(getPartitionSegmentPruner(tableConfig, propertyStore));
}
}
}
}
- return Collections.emptyList();
+ return segmentPruners;
}
@Nullable
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 613b755..ddd5dbb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -125,15 +125,18 @@ public class TimeBoundaryManager {
LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _offlineTableName);
return INVALID_END_TIME_MS;
}
-
- long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
- if (endTime > 0) {
- TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
- return timeUnit.toMillis(endTime);
- } else {
- LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
- return INVALID_END_TIME_MS;
+ long totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
+ long endTimeMs = INVALID_END_TIME_MS;
+ if (totalDocs != 0) {
+ long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+ if (endTime > 0) {
+ TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
+ endTimeMs = timeUnit.toMillis(endTime);
+ } else {
+ LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
+ }
}
+ return endTimeMs;
}
private void updateTimeBoundaryInfo(long maxEndTimeMs) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index da3744a..a639709 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -117,49 +117,64 @@ public class SegmentPrunerTest {
when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
// Routing config is missing
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Segment pruner type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Segment partition config is missing
when(routingConfig.getSegmentPrunerTypes())
.thenReturn(Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Column partition config is missing
Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
when(indexingConfig.getSegmentPartitionConfig()).thenReturn(new SegmentPartitionConfig(columnPartitionConfigMap));
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Partition-aware segment pruner should be returned
columnPartitionConfigMap.put(PARTITION_COLUMN, new ColumnPartitionConfig("Modulo", 5));
- List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 2);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
// Do not allow multiple partition columns
columnPartitionConfigMap.put("anotherPartitionColumn", new ColumnPartitionConfig("Modulo", 5));
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Should be backward-compatible with legacy config
columnPartitionConfigMap.remove("anotherPartitionColumn");
when(routingConfig.getSegmentPrunerTypes()).thenReturn(null);
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
when(routingConfig.getRoutingTableBuilderName())
.thenReturn(SegmentPrunerFactory.LEGACY_PARTITION_AWARE_OFFLINE_ROUTING);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
+ assertEquals(segmentPruners.size(), 2);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
when(routingConfig.getRoutingTableBuilderName())
.thenReturn(SegmentPrunerFactory.LEGACY_PARTITION_AWARE_REALTIME_ROUTING);
segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
+ assertEquals(segmentPruners.size(), 2);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
}
@Test
@@ -169,29 +184,37 @@ public class SegmentPrunerTest {
setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.HOURS);
// Routing config is missing
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Segment pruner type is not configured
RoutingConfig routingConfig = mock(RoutingConfig.class);
when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Validation config is missing
when(routingConfig.getSegmentPrunerTypes())
.thenReturn(Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Time column is missing
SegmentsValidationAndRetentionConfig validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
when(tableConfig.getValidationConfig()).thenReturn(validationConfig);
- assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 1);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
// Time range pruner should be returned
when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
- List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
- assertEquals(segmentPruners.size(), 1);
- assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
-
+ segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+ assertEquals(segmentPruners.size(), 2);
+ assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+ assertTrue(segmentPruners.get(1) instanceof TimeSegmentPruner);
}
@Test
@@ -460,6 +483,128 @@ public class SegmentPrunerTest {
assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet());
}
+ @Test
+ public void testEmptySegmentPruner() {
+ CalciteSqlCompiler sqlCompiler = new CalciteSqlCompiler();
+ BrokerRequest brokerRequest1 = sqlCompiler.compileToBrokerRequest(QUERY_1);
+ BrokerRequest brokerRequest2 = sqlCompiler.compileToBrokerRequest(QUERY_2);
+ BrokerRequest brokerRequest3 = sqlCompiler.compileToBrokerRequest(QUERY_3);
+
+ ExternalView externalView = Mockito.mock(ExternalView.class);
+ IdealState idealState = Mockito.mock(IdealState.class);
+
+ TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
+
+ // init with list of segments
+ EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore);
+ Set<String> onlineSegments = new HashSet<>();
+ String segment0 = "segment0";
+ onlineSegments.add(segment0);
+ setRealtimeSegmentZKTotalDocsMetadata(segment0, 10);
+ String segment1 = "segment1";
+ onlineSegments.add(segment1);
+ setRealtimeSegmentZKTotalDocsMetadata(segment1, 0);
+ segmentPruner.init(externalView, idealState, onlineSegments);
+ assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
+ new HashSet<>(Collections.singletonList(segment0)));
+ assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
+ new HashSet<>(Collections.singletonList(segment0)));
+ assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
+ new HashSet<>(Collections.singletonList(segment0)));
+
+ // init with empty list of segments
+ segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore);
+ onlineSegments.clear();
+ segmentPruner.init(externalView, idealState, onlineSegments);
+ assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet());
+ assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet());
+
+ // Segments without metadata (not updated yet) should not be pruned
+ String newSegment = "newSegment";
+ onlineSegments.add(newSegment);
+ segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+ assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(newSegment))),
+ Collections.singletonList(newSegment));
+ assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Collections.singletonList(newSegment))),
+ Collections.singletonList(newSegment));
+ assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Collections.singletonList(newSegment))),
+ Collections.singletonList(newSegment));
+
+ // Segments without totalDocs metadata should not be pruned
+ onlineSegments.clear();
+ String segmentWithoutTotalDocsMetadata = "segmentWithoutTotalDocsMetadata";
+ onlineSegments.add(segmentWithoutTotalDocsMetadata);
+ RealtimeSegmentZKMetadata segmentZKMetadataWithoutTotalDocsMetadata = new RealtimeSegmentZKMetadata();
+ segmentZKMetadataWithoutTotalDocsMetadata.setSegmentName(segmentWithoutTotalDocsMetadata);
+ segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ ZKMetadataProvider
+ .setRealtimeSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, segmentZKMetadataWithoutTotalDocsMetadata);
+ segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+ assertEquals(
+ segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithoutTotalDocsMetadata))),
+ Collections.singletonList(segmentWithoutTotalDocsMetadata));
+ assertEquals(
+ segmentPruner.prune(brokerRequest2, new HashSet<>(Collections.singletonList(segmentWithoutTotalDocsMetadata))),
+ Collections.singletonList(segmentWithoutTotalDocsMetadata));
+ assertEquals(
+ segmentPruner.prune(brokerRequest3, new HashSet<>(Collections.singletonList(segmentWithoutTotalDocsMetadata))),
+ Collections.singletonList(segmentWithoutTotalDocsMetadata));
+
+ // Segments with -1 totalDocs should not be pruned
+ onlineSegments.clear();
+ String segmentWithNegativeTotalDocsMetadata = "segmentWithNegativeTotalDocsMetadata";
+ onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
+ setRealtimeSegmentZKTotalDocsMetadata(segmentWithNegativeTotalDocsMetadata, -1);
+ segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+ assertEquals(segmentPruner
+ .prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithNegativeTotalDocsMetadata))),
+ Collections.singletonList(segmentWithNegativeTotalDocsMetadata));
+ assertEquals(segmentPruner
+ .prune(brokerRequest2, new HashSet<>(Collections.singletonList(segmentWithNegativeTotalDocsMetadata))),
+ Collections.singletonList(segmentWithNegativeTotalDocsMetadata));
+ assertEquals(segmentPruner
+ .prune(brokerRequest3, new HashSet<>(Collections.singletonList(segmentWithNegativeTotalDocsMetadata))),
+ Collections.singletonList(segmentWithNegativeTotalDocsMetadata));
+
+ // Prune segments with 0 total docs
+ onlineSegments.clear();
+ onlineSegments.add(segment0);
+ setRealtimeSegmentZKTotalDocsMetadata(segment0, 10);
+ onlineSegments.add(segment1);
+ setRealtimeSegmentZKTotalDocsMetadata(segment1, 0);
+ String segment2 = "segment2";
+ onlineSegments.add(segment2);
+ setRealtimeSegmentZKTotalDocsMetadata(segment2, -1);
+
+ segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+ assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Arrays.asList(segment0, segment2)));
+
+ // Update metadata without external view change or refreshing should have no effect
+ setSegmentZKTimeRangeMetadata(segment2, 20, 30, TimeUnit.DAYS);
+ setRealtimeSegmentZKTotalDocsMetadata(segment2, 0);
+ assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Arrays.asList(segment0, segment2)));
+ assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Arrays.asList(segment0, segment2)));
+
+ // Refresh the changed segment should update the segment pruner
+ segmentPruner.refreshSegment(segment2);
+ assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Collections.singletonList(segment0)));
+ assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Collections.singletonList(segment0)));
+ assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+ new HashSet<>(Collections.singletonList(segment0)));
+ }
+
private TableConfig getTableConfig(String rawTableName, TableType type) {
return new TableConfigBuilder(type).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN).build();
}
@@ -493,4 +638,12 @@ public class SegmentPrunerTest {
realtimeSegmentZKMetadata.setTimeUnit(unit);
ZKMetadataProvider.setRealtimeSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, realtimeSegmentZKMetadata);
}
+
+ private void setRealtimeSegmentZKTotalDocsMetadata(String segment, long totalDocs) {
+ RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new RealtimeSegmentZKMetadata();
+ realtimeSegmentZKMetadata.setSegmentName(segment);
+ realtimeSegmentZKMetadata.setTotalDocs(totalDocs);
+ realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+ ZKMetadataProvider.setRealtimeSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, realtimeSegmentZKMetadata);
+ }
}
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 00ca6c7..345968a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -125,6 +125,13 @@ public class TimeBoundaryManagerTest {
timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
+ // Add new segment with larger end time but 0 total docs, should not update time boundary
+ String segmentEmpty = "segmentEmpty";
+ onlineSegments.add(segmentEmpty);
+ setSegmentZKMetadataWithTotalDocs(rawTableName, segmentEmpty, 6, timeUnit, 0);
+ timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
+ verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
+
// Add a new segment with smaller end time should not change the time boundary
String segment2 = "segment2";
onlineSegments.add(segment2);
@@ -194,6 +201,18 @@ public class TimeBoundaryManagerTest {
offlineSegmentZKMetadata);
}
+ private void setSegmentZKMetadataWithTotalDocs(String rawTableName, String segment, int endTimeInDays,
+ TimeUnit timeUnit, long totalDocs) {
+ OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
+ offlineSegmentZKMetadata.setSegmentName(segment);
+ offlineSegmentZKMetadata.setEndTime(timeUnit.convert(endTimeInDays, TimeUnit.DAYS));
+ offlineSegmentZKMetadata.setTimeUnit(timeUnit);
+ offlineSegmentZKMetadata.setTotalDocs(totalDocs);
+ ZKMetadataProvider
+ .setOfflineSegmentZKMetadata(_propertyStore, TableNameBuilder.OFFLINE.tableNameWithType(rawTableName),
+ offlineSegmentZKMetadata);
+ }
+
private void verifyTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo, long expectedTimeValue) {
assertNotNull(timeBoundaryInfo);
assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 42d50d0..ece2c40 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -507,10 +507,17 @@ public class PinotLLCRealtimeSegmentManager {
committingSegmentZKMetadata.setDownloadUrl(isPeerURL(committingSegmentDescriptor.getSegmentLocation())
? CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD : committingSegmentDescriptor.getSegmentLocation());
committingSegmentZKMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()));
- Preconditions.checkNotNull(segmentMetadata.getTimeInterval(),
- "start/end time information is not correctly written to the segment for table: " + realtimeTableName);
- committingSegmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
- committingSegmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
+ if (segmentMetadata.getTotalDocs() > 0) {
+ Preconditions.checkNotNull(segmentMetadata.getTimeInterval(),
+ "start/end time information is not correctly written to the segment for table: " + realtimeTableName);
+ committingSegmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
+ committingSegmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
+ } else {
+ // Set current time as start/end time if total docs is 0
+ long now = System.currentTimeMillis();
+ committingSegmentZKMetadata.setStartTime(now);
+ committingSegmentZKMetadata.setEndTime(now);
+ }
committingSegmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
committingSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index a2e27c1..9b99813 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -53,12 +53,12 @@ public class SegmentMetadataMockUtils {
public static SegmentMetadata mockSegmentMetadata(String tableName) {
String uniqueNumericString = Long.toString(System.nanoTime());
- return mockSegmentMetadata(tableName, tableName + uniqueNumericString, 0, uniqueNumericString);
+ return mockSegmentMetadata(tableName, tableName + uniqueNumericString, 100, uniqueNumericString);
}
public static SegmentMetadata mockSegmentMetadata(String tableName, String segmentName) {
String uniqueNumericString = Long.toString(System.nanoTime());
- return mockSegmentMetadata(tableName, segmentName, 0, uniqueNumericString);
+ return mockSegmentMetadata(tableName, segmentName, 100, uniqueNumericString);
}
public static RealtimeSegmentZKMetadata mockRealtimeSegmentZKMetadata(String tableName, String segmentName,
@@ -90,7 +90,7 @@ public class SegmentMetadataMockUtils {
SegmentMetadata segmentMetadata = Mockito.mock(SegmentMetadata.class);
Mockito.when(segmentMetadata.getTableName()).thenReturn(tableName);
Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
- Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(0);
+ Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(10);
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
Mockito.when(segmentMetadata.getPushTime()).thenReturn(Long.MIN_VALUE);
Mockito.when(segmentMetadata.getRefreshTime()).thenReturn(Long.MIN_VALUE);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index 312cda7..c3c2db6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -75,34 +76,37 @@ public class PinotSegmentRecordReader implements RecordReader {
try {
SegmentMetadata segmentMetadata = _immutableSegment.getSegmentMetadata();
_numDocs = segmentMetadata.getTotalDocs();
- if (schema == null) {
- // In order not to expose virtual columns to client, schema shouldn't be fetched from segmentMetadata;
- // otherwise the original metadata will be modified. Hence, initialize a new schema.
- _schema = new SegmentMetadataImpl(indexDir).getSchema();
- Collection<String> columnNames = _schema.getColumnNames();
- _columnReaderMap = new HashMap<>(columnNames.size());
- for (String columnName : columnNames) {
- _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+ // In order not to expose virtual columns to client, schema shouldn't be fetched from segmentMetadata;
+ // otherwise the original metadata will be modified. Hence, initialize a new schema.
+ _schema = schema == null ? new SegmentMetadataImpl(indexDir).getSchema() : schema;
+ if (_numDocs > 0) {
+ _columnReaderMap = new HashMap<>();
+ if (schema == null) {
+ Collection<String> columnNames = _schema.getColumnNames();
+ for (String columnName : columnNames) {
+ _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+ }
+ } else {
+ Schema segmentSchema = segmentMetadata.getSchema();
+ Collection<FieldSpec> fieldSpecs = _schema.getAllFieldSpecs();
+ for (FieldSpec fieldSpec : fieldSpecs) {
+ String columnName = fieldSpec.getName();
+ FieldSpec segmentFieldSpec = segmentSchema.getFieldSpecFor(columnName);
+ Preconditions.checkState(fieldSpec.equals(segmentFieldSpec),
+ "Field spec mismatch for column: %s, in the given schema: %s, in the segment schema: %s", columnName,
+ fieldSpec, segmentFieldSpec);
+ _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+ }
}
- } else {
- _schema = schema;
- Schema segmentSchema = segmentMetadata.getSchema();
- Collection<FieldSpec> fieldSpecs = _schema.getAllFieldSpecs();
- _columnReaderMap = new HashMap<>(fieldSpecs.size());
- for (FieldSpec fieldSpec : fieldSpecs) {
- String columnName = fieldSpec.getName();
- FieldSpec segmentFieldSpec = segmentSchema.getFieldSpecFor(columnName);
- Preconditions.checkState(fieldSpec.equals(segmentFieldSpec),
- "Field spec mismatch for column: %s, in the given schema: %s, in the segment schema: %s", columnName,
- fieldSpec, segmentFieldSpec);
- _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+ // Initialize sorted doc ids
+ if (sortOrder != null && !sortOrder.isEmpty()) {
+ _docIdsInSortedColumnOrder =
+ new PinotSegmentSorter(_numDocs, _schema, _columnReaderMap).getSortedDocIds(sortOrder);
+ } else {
+ _docIdsInSortedColumnOrder = null;
}
- }
- // Initialize sorted doc ids
- if (sortOrder != null && !sortOrder.isEmpty()) {
- _docIdsInSortedColumnOrder =
- new PinotSegmentSorter(_numDocs, _schema, _columnReaderMap).getSortedDocIds(sortOrder);
} else {
+ _columnReaderMap = Collections.emptyMap();
_docIdsInSortedColumnOrder = null;
}
} catch (Exception e) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/EmptyIndexSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/EmptyIndexSegment.java
new file mode 100644
index 0000000..a11dc46
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/EmptyIndexSegment.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.immutable;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.segment.index.datasource.EmptyDataSource;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Immutable segment impl for empty segment
+ * Such an IndexSegment contains only the metadata, and no indexes
+ */
+public class EmptyIndexSegment implements ImmutableSegment {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EmptyIndexSegment.class);
+
+ private final SegmentMetadataImpl _segmentMetadata;
+
+ public EmptyIndexSegment(SegmentMetadataImpl segmentMetadata) {
+ _segmentMetadata = segmentMetadata;
+ }
+
+ @Override
+ public String getSegmentName() {
+ return _segmentMetadata.getName();
+ }
+
+ @Override
+ public SegmentMetadataImpl getSegmentMetadata() {
+ return _segmentMetadata;
+ }
+
+ @Override
+ public DataSource getDataSource(String column) {
+ ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
+ Preconditions.checkNotNull(columnMetadata,
+ "ColumnMetadata for " + column + " should not be null. " + "Potentially invalid column name specified.");
+ return new EmptyDataSource(columnMetadata);
+ }
+
+ @Override
+ public Set<String> getColumnNames() {
+ return _segmentMetadata.getSchema().getColumnNames();
+ }
+
+ @Override
+ public Set<String> getPhysicalColumnNames() {
+ return _segmentMetadata.getSchema().getPhysicalColumnNames();
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public List<StarTreeV2> getStarTrees() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public ValidDocIndexReader getValidDocIndex() {
+ return null;
+ }
+
+ @Override
+ public GenericRow getRecord(int docId, GenericRow reuse) {
+ // NOTE: Use PinotSegmentRecordReader to read immutable segment
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Dictionary getDictionary(String column) {
+ return null;
+ }
+
+ @Override
+ public ForwardIndexReader getForwardIndex(String column) {
+ return null;
+ }
+
+ @Override
+ public InvertedIndexReader getInvertedIndex(String column) {
+ return null;
+ }
+
+ @Override
+ public long getSegmentSizeBytes() {
+ return 0;
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index b7b1fe0..59dec61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -101,6 +101,9 @@ public class ImmutableSegmentLoader {
// Load the metadata again since converter and pre-processor may have changed it
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+ if (segmentMetadata.getTotalDocs() == 0) {
+ return new EmptyIndexSegment(segmentMetadata);
+ }
// Load the segment
ReadMode readMode = indexLoadingConfig.getReadMode();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index be9f092..ff7dc01 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.pinot.core.indexsegment.IndexSegment;
import org.apache.pinot.core.query.config.SegmentPrunerConfig;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
/**
* The <code>SegmentPrunerService</code> class contains multiple segment pruners and provides service to prune segments
* against all pruners.
+ * {@link ValidSegmentPruner} is always set as the first pruner
*/
public class SegmentPrunerService {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPrunerService.class);
@@ -38,11 +40,18 @@ public class SegmentPrunerService {
public SegmentPrunerService(SegmentPrunerConfig config) {
int numPruners = config.numSegmentPruners();
- _segmentPruners = new ArrayList<>(numPruners);
+ _segmentPruners = new ArrayList<>(numPruners + 1);
+
+ String validSegmentPrunerName = ValidSegmentPruner.class.getSimpleName();
+ _segmentPruners.add(SegmentPrunerProvider.getSegmentPruner(validSegmentPrunerName, new PinotConfiguration()));
+
for (int i = 0; i < numPruners; i++) {
- LOGGER.info("Adding segment pruner: " + config.getSegmentPrunerName(i));
- _segmentPruners.add(
- SegmentPrunerProvider.getSegmentPruner(config.getSegmentPrunerName(i), config.getSegmentPrunerConfig(i)));
+ String segmentPrunerName = config.getSegmentPrunerName(i);
+ if (!validSegmentPrunerName.equalsIgnoreCase(segmentPrunerName)) {
+ LOGGER.info("Adding segment pruner: " + segmentPrunerName);
+ _segmentPruners
+ .add(SegmentPrunerProvider.getSegmentPruner(segmentPrunerName, config.getSegmentPrunerConfig(i)));
+ }
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index 1be48c4..cc1750e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -40,7 +40,8 @@ public class RealtimeSegmentRecordReader implements RecordReader {
public RealtimeSegmentRecordReader(MutableSegmentImpl realtimeSegment, @Nullable String sortedColumn) {
_realtimeSegment = realtimeSegment;
_numDocs = realtimeSegment.getNumDocsIndexed();
- _sortedDocIdIterationOrder = sortedColumn != null ? realtimeSegment.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) : null;
+ _sortedDocIdIterationOrder = sortedColumn != null && realtimeSegment.getNumDocsIndexed() > 0 ? realtimeSegment
+ .getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) : null;
}
public int[] getSortedDocIdIterationOrder() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index f0dc321..9041632 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -127,6 +127,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
this.schema = schema;
this.totalDocs = segmentIndexCreationInfo.getTotalDocs();
+ if (totalDocs == 0) {
+ return;
+ }
Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
Set<String> invertedIndexColumns = new HashSet<>();
@@ -544,20 +547,34 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
endTime = Long.parseLong(config.getEndTime());
timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
} else {
- String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
- String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
-
- if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
- // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
- DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(config.getSimpleDateFormat());
- startTime = dateTimeFormatter.parseMillis(startTimeStr);
- endTime = dateTimeFormatter.parseMillis(endTimeStr);
- timeUnit = TimeUnit.MILLISECONDS;
+ if (totalDocs > 0) {
+ String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
+ String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
+
+ if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+ // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
+ DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(config.getSimpleDateFormat());
+ startTime = dateTimeFormatter.parseMillis(startTimeStr);
+ endTime = dateTimeFormatter.parseMillis(endTimeStr);
+ timeUnit = TimeUnit.MILLISECONDS;
+ } else {
+ // by default, time column type is TimeColumnType.EPOCH
+ startTime = Long.parseLong(startTimeStr);
+ endTime = Long.parseLong(endTimeStr);
+ timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+ }
} else {
- // by default, time column type is TimeColumnType.EPOCH
- startTime = Long.parseLong(startTimeStr);
- endTime = Long.parseLong(endTimeStr);
- timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+ // No records in segment. Use current time as start/end
+ long now = System.currentTimeMillis();
+ if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+ startTime = now;
+ endTime = now;
+ timeUnit = TimeUnit.MILLISECONDS;
+ } else {
+ timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+ startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+ endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+ }
}
}
@@ -656,10 +673,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
}
// NOTE: Min/max could be null for real-time aggregate metrics.
- Object min = columnIndexCreationInfo.getMin();
- Object max = columnIndexCreationInfo.getMax();
- if (min != null && max != null) {
- addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString());
+ if (totalDocs > 0) {
+ Object min = columnIndexCreationInfo.getMin();
+ Object max = columnIndexCreationInfo.getMax();
+ if (min != null && max != null) {
+ addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString());
+ }
}
String defaultNullValue = columnIndexCreationInfo.getDefaultNullValue().toString();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 3078dc7..19ec420 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -145,7 +145,6 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
throws Exception {
this.config = config;
recordReader = dataSource.getRecordReader();
- Preconditions.checkState(recordReader.hasNext(), "No record in data source");
dataSchema = config.getSchema();
_recordTransformer = recordTransformer;
@@ -240,8 +239,14 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
ColumnStatistics timeColumnStatistics = segmentStats.getColumnProfileFor(config.getTimeColumnName());
int sequenceId = config.getSequenceId();
if (timeColumnStatistics != null) {
- segmentName = config.getSegmentNameGenerator()
- .generateSegmentName(sequenceId, timeColumnStatistics.getMinValue(), timeColumnStatistics.getMaxValue());
+ if (totalDocs > 0) {
+ segmentName = config.getSegmentNameGenerator()
+ .generateSegmentName(sequenceId, timeColumnStatistics.getMinValue(), timeColumnStatistics.getMaxValue());
+ } else {
+ // Generate a unique name for a segment with no rows
+ long now = System.currentTimeMillis();
+ segmentName = config.getSegmentNameGenerator().generateSegmentName(sequenceId, now, now);
+ }
} else {
segmentName = config.getSegmentNameGenerator().generateSegmentName(sequenceId, null, null);
}
@@ -272,7 +277,9 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
convertFormatIfNecessary(segmentOutputDir);
// Build star-tree V2 if necessary
- buildStarTreeV2IfNecessary(segmentOutputDir);
+ if (totalDocs > 0) {
+ buildStarTreeV2IfNecessary(segmentOutputDir);
+ }
// Compute CRC and creation time
long crc = CrcUtils.forAllFilesInFolder(segmentOutputDir).computeCrc();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/EmptyDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/EmptyDataSource.java
new file mode 100644
index 0000000..8eaf1d8
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/EmptyDataSource.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.datasource;
+
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The {@code EmptyImmutableDataSource} class is the data source for a column in the immutable segment with 0 rows.
+ */
+public class EmptyDataSource extends BaseDataSource {
+
+ public EmptyDataSource(ColumnMetadata columnMetadata) {
+ super(new EmptyDataSourceMetadata(columnMetadata), null, null, null, null, null, null, null, null, null, null);
+ }
+
+ private static class EmptyDataSourceMetadata implements DataSourceMetadata {
+ final FieldSpec _fieldSpec;
+
+ EmptyDataSourceMetadata(ColumnMetadata columnMetadata) {
+ _fieldSpec = columnMetadata.getFieldSpec();
+ }
+
+ @Override
+ public FieldSpec getFieldSpec() {
+ return _fieldSpec;
+ }
+
+ @Override
+ public boolean isSorted() {
+ return false;
+ }
+
+ @Override
+ public int getNumDocs() {
+ return 0;
+ }
+
+ @Override
+ public int getNumValues() {
+ return 0;
+ }
+
+ @Override
+ public int getMaxNumValuesPerMVEntry() {
+ return -1;
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMinValue() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Comparable getMaxValue() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public PartitionFunction getPartitionFunction() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public Set<Integer> getPartitions() {
+ return null;
+ }
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index acfeae3..1055fd7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -51,6 +51,9 @@ public class RecordReaderSampleDataTest {
private static final File JSON_SAMPLE_DATA_FILE = new File(Preconditions
.checkNotNull(RecordReaderSampleDataTest.class.getClassLoader().getResource("data/test_sample_data.json"))
.getFile());
+ private static final File JSON_EMPTY_DATA_FILE = new File(Preconditions
+ .checkNotNull(RecordReaderSampleDataTest.class.getClassLoader().getResource("data/test_empty_data.json"))
+ .getFile());
private final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG)
.addSingleValueDimension("column2", FieldSpec.DataType.INT)
.addSingleValueDimension("column3", FieldSpec.DataType.STRING)
@@ -106,6 +109,15 @@ public class RecordReaderSampleDataTest {
}
}
+ @Test
+ public void testRecordReaderEmptyFile()
+ throws Exception {
+ try (RecordReader jsonRecordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.JSON, JSON_EMPTY_DATA_FILE, SCHEMA.getColumnNames(), null)) {
+ Assert.assertFalse(jsonRecordReader.hasNext());
+ }
+ }
+
/**
* Tests that record extractor is able to handle missing fields correctly (incoming and outgoing are missing from data)
* @throws Exception
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNoRecordsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNoRecordsTest.java
new file mode 100644
index 0000000..e1b682d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNoRecordsTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.creator;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests segment generation for empty files
+ */
+public class SegmentGenerationWithNoRecordsTest {
+ private static final String STRING_COLUMN1 = "string_col1";
+ private static final String STRING_COLUMN2 = "string_col2";
+ private static final String STRING_COLUMN3 = "string_col3";
+ private static final String STRING_COLUMN4 = "string_col4";
+ private static final String LONG_COLUMN1 = "long_col1";
+ private static final String LONG_COLUMN2 = "long_col2";
+ private static final String LONG_COLUMN3 = "long_col3";
+ private static final String LONG_COLUMN4 = "long_col4";
+ private static final String MV_INT_COLUMN = "mv_col";
+ private static final String DATE_TIME_COLUMN = "date_time_col";
+ private static final String SEGMENT_DIR_NAME =
+ FileUtils.getTempDirectoryPath() + File.separator + "segmentNoRecordsTest";
+ private static final String SEGMENT_NAME = "testSegment";
+
+ private Schema _schema;
+ private TableConfig _tableConfig;
+
+ @BeforeClass
+ public void setup() {
+ _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+ .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
+ .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+ .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2)).setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+ .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
+ _tableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
+ _schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+ .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+ .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+ }
+
+ @BeforeMethod
+ public void reset() {
+ FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME));
+ }
+
+ @Test
+ public void testNumDocs()
+ throws Exception {
+ File segmentDir = buildSegment(_tableConfig, _schema);
+ SegmentMetadataImpl metadata = SegmentDirectory.loadSegmentMetadata(segmentDir);
+ Assert.assertEquals(metadata.getTotalDocs(), 0);
+ Assert.assertEquals(metadata.getTimeColumn(), DATE_TIME_COLUMN);
+ Assert.assertEquals(metadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+ Assert.assertEquals(metadata.getStartTime(), metadata.getEndTime());
+ Assert.assertTrue(metadata.getAllColumns().containsAll(_schema.getColumnNames()));
+ PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(segmentDir);
+ Assert.assertFalse(segmentRecordReader.hasNext());
+ }
+
+ private File buildSegment(final TableConfig tableConfig, final Schema schema)
+ throws Exception {
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+ config.setOutDir(SEGMENT_DIR_NAME);
+ config.setSegmentName(SEGMENT_NAME);
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config, new GenericRowRecordReader(Collections.emptyList()));
+ driver.build();
+ driver.getOutputDirectory().deleteOnExit();
+ return driver.getOutputDirectory();
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index c28a2d8..fa00339 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -51,8 +51,13 @@ import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.pql.parsers.Pql2Compiler;
import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -63,10 +68,12 @@ import com.yammer.metrics.core.MetricsRegistry;
public class QueryExecutorTest {
private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
+ private static final String EMPTY_JSON_DATA_PATH = "data/test_empty_data.json";
private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties";
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
private static final String TABLE_NAME = "testTable";
private static final int NUM_SEGMENTS_TO_GENERATE = 2;
+ private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
private static final Pql2Compiler COMPILER = new Pql2Compiler();
private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20);
@@ -85,9 +92,12 @@ public class QueryExecutorTest {
URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
Assert.assertNotNull(resourceUrl);
File avroFile = new File(resourceUrl.getFile());
- for (int i = 0; i < NUM_SEGMENTS_TO_GENERATE; i++) {
+ Schema schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(avroFile);
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+ int i = 0;
+ for (; i < NUM_SEGMENTS_TO_GENERATE; i++) {
SegmentGeneratorConfig config =
- SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, INDEX_DIR, TABLE_NAME);
+ SegmentTestUtils.getSegmentGeneratorConfig(avroFile, FileFormat.AVRO, INDEX_DIR, TABLE_NAME, tableConfig, schema);
config.setSegmentNamePostfix(Integer.toString(i));
SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
driver.init(config);
@@ -100,6 +110,19 @@ public class QueryExecutorTest {
_indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
_segmentNames.add(driver.getSegmentName());
}
+ resourceUrl = getClass().getClassLoader().getResource(EMPTY_JSON_DATA_PATH);
+ Assert.assertNotNull(resourceUrl);
+ File jsonFile = new File(resourceUrl.getFile());
+ for (; i < NUM_SEGMENTS_TO_GENERATE + NUM_EMPTY_SEGMENTS_TO_GENERATE; i++) {
+ SegmentGeneratorConfig config =
+ SegmentTestUtils.getSegmentGeneratorConfig(jsonFile, FileFormat.JSON, INDEX_DIR, TABLE_NAME, tableConfig, schema);
+ config.setSegmentNamePostfix(Integer.toString(i));
+ SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+ driver.init(config);
+ driver.build();
+ _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
+ _segmentNames.add(driver.getSegmentName());
+ }
// Mock the instance data manager
_serverMetrics = new ServerMetrics(new MetricsRegistry());
diff --git a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
index 71f42fa..430f597 100644
--- a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -18,16 +18,26 @@
*/
package org.apache.pinot.realtime.converter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
import org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.Assert;
@@ -36,6 +46,23 @@ import org.testng.annotations.Test;
public class RealtimeSegmentConverterTest {
+ private static final String STRING_COLUMN1 = "string_col1";
+ private static final String STRING_COLUMN2 = "string_col2";
+ private static final String STRING_COLUMN3 = "string_col3";
+ private static final String STRING_COLUMN4 = "string_col4";
+ private static final String LONG_COLUMN1 = "long_col1";
+ private static final String LONG_COLUMN2 = "long_col2";
+ private static final String LONG_COLUMN3 = "long_col3";
+ private static final String LONG_COLUMN4 = "long_col4";
+ private static final String MV_INT_COLUMN = "mv_col";
+ private static final String DATE_TIME_COLUMN = "date_time_col";
+
+ private static final File TMP_DIR = new File(FileUtils.getTempDirectory(), RealtimeSegmentConverterTest.class.getName());
+
+ public void setup() {
+ Preconditions.checkState(TMP_DIR.mkdirs());
+ }
+
@Test
public void testNoVirtualColumnsInSchema() {
Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING)
@@ -47,4 +74,74 @@ public class RealtimeSegmentConverterTest {
Schema newSchema = RealtimeSegmentConverter.getUpdatedSchema(schema);
Assert.assertEquals(newSchema.getColumnNames().size(), 2);
}
+
+ @Test
+ public void testNoRecordsIndexed()
+ throws Exception {
+
+ File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+ TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+ .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
+ .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+ .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2)).setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+ .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
+ Schema schema = new Schema.SchemaBuilder()
+ .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+ .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+ .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+ .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+ .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+ .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+ .build();
+
+ String tableNameWithType = tableConfig.getTableName();
+ String segmentName = "testTable__0__0__123456";
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+ RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+ new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+ .setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN)
+ .setCapacity(1000).setAvgNumMultiValues(3)
+ .setNoDictionaryColumns(Sets.newHashSet(LONG_COLUMN2))
+ .setVarLengthDictionaryColumns(Sets.newHashSet(STRING_COLUMN3))
+ .setInvertedIndexColumns(Sets.newHashSet(STRING_COLUMN1))
+ .setRealtimeSegmentZKMetadata(getRealtimeSegmentZKMetadata(segmentName))
+ .setOffHeap(true).setMemoryManager(new DirectMemoryManager(segmentName))
+ .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(tmpDir, "stats")))
+ .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+ // create mutable segment impl
+ MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+
+ File outputDir = new File(tmpDir, "outputDir");
+ RealtimeSegmentConverter converter =
+ new RealtimeSegmentConverter(mutableSegmentImpl, outputDir.getAbsolutePath(), schema,
+ tableNameWithType, tableConfig, segmentName, indexingConfig.getSortedColumn().get(0),
+ indexingConfig.getInvertedIndexColumns(), null, null, indexingConfig.getNoDictionaryColumns(),
+ indexingConfig.getVarLengthDictionaryColumns(), false);
+
+ converter.build(SegmentVersion.v3, null);
+ SegmentMetadataImpl metadata = new SegmentMetadataImpl(new File(outputDir, segmentName));
+ Assert.assertEquals(metadata.getTotalDocs(), 0);
+ Assert.assertEquals(metadata.getTimeColumn(), DATE_TIME_COLUMN);
+ Assert.assertEquals(metadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+ Assert.assertEquals(metadata.getStartTime(), metadata.getEndTime());
+ Assert.assertTrue(metadata.getAllColumns().containsAll(schema.getColumnNames()));
+ System.out.println(outputDir);
+ }
+
+ private RealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String segmentName) {
+ RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new RealtimeSegmentZKMetadata();
+ realtimeSegmentZKMetadata.setCreationTime(System.currentTimeMillis());
+ realtimeSegmentZKMetadata.setSegmentName(segmentName);
+ return realtimeSegmentZKMetadata;
+ }
+
+ public void destroy() {
+ FileUtils.deleteQuietly(TMP_DIR);
+ }
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java
index ae8f245..38da512 100644
--- a/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java
@@ -94,6 +94,16 @@ public class SegmentTestUtils {
return segmentGeneratorConfig;
}
+ public static SegmentGeneratorConfig getSegmentGeneratorConfig(File inputFile, FileFormat inputFormat, File outputDir,
+ String tableName, TableConfig tableConfig, Schema schema) {
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+ segmentGeneratorConfig.setInputFilePath(inputFile.getAbsolutePath());
+ segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+ segmentGeneratorConfig.setFormat(inputFormat);
+ segmentGeneratorConfig.setTableName(tableName);
+ return segmentGeneratorConfig;
+ }
+
public static Schema extractSchemaFromAvroWithoutTime(File avroFile)
throws IOException {
DataFileStream<GenericRecord> dataStream =
diff --git a/pinot-core/src/test/resources/data/test_empty_data.json b/pinot-core/src/test/resources/data/test_empty_data.json
new file mode 100644
index 0000000..e69de29
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org