You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/02/17 20:51:14 UTC

[incubator-pinot] branch master updated: Handle creation of segments with 0 rows (#6466)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 60b0c5f  Handle creation of segments with 0 rows  (#6466)
60b0c5f is described below

commit 60b0c5f2dce5e82c7b1013a3b8d8935774d12d72
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Feb 17 12:50:58 2021 -0800

    Handle creation of segments with 0 rows  (#6466)
    
    * Handle creation of segments with 0 rows
    
    * Broker and server side pruners for empty segment
    
    * Set current time as start/end if 0 totalDocs. Handle in time boundary
    
    * EmptyIndexSegment and EmptyDataSource
    
    * Make ValidSegmentPruner as first pruner
---
 .../routing/segmentpruner/EmptySegmentPruner.java  | 130 ++++++++++++++
 .../segmentpruner/SegmentPrunerFactory.java        |  16 +-
 .../routing/timeboundary/TimeBoundaryManager.java  |  19 +-
 .../routing/segmentpruner/SegmentPrunerTest.java   | 195 ++++++++++++++++++---
 .../timeboundary/TimeBoundaryManagerTest.java      |  19 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  15 +-
 .../controller/utils/SegmentMetadataMockUtils.java |   6 +-
 .../data/readers/PinotSegmentRecordReader.java     |  54 +++---
 .../indexsegment/immutable/EmptyIndexSegment.java  | 120 +++++++++++++
 .../immutable/ImmutableSegmentLoader.java          |   3 +
 .../core/query/pruner/SegmentPrunerService.java    |  17 +-
 .../converter/RealtimeSegmentRecordReader.java     |   3 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  53 ++++--
 .../impl/SegmentIndexCreationDriverImpl.java       |  15 +-
 .../segment/index/datasource/EmptyDataSource.java  |  94 ++++++++++
 .../data/readers/RecordReaderSampleDataTest.java   |  12 ++
 .../SegmentGenerationWithNoRecordsTest.java        | 117 +++++++++++++
 .../pinot/query/executor/QueryExecutorTest.java    |  27 ++-
 .../converter/RealtimeSegmentConverterTest.java    | 103 ++++++++++-
 .../segments/v1/creator/SegmentTestUtils.java      |  10 ++
 .../src/test/resources/data/test_empty_data.json   |   0
 21 files changed, 930 insertions(+), 98 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
new file mode 100644
index 0000000..8e45331
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/EmptySegmentPruner.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.segmentpruner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code EmptySegmentPruner} prunes segments if they have 0 totalDocs.
+ * Does not prune segments with -1 total docs (that can be either error case or CONSUMING segment)
+ */
+public class EmptySegmentPruner implements SegmentPruner {
+  private static final Logger LOGGER = LoggerFactory.getLogger(EmptySegmentPruner.class);
+
+  private final String _tableNameWithType;
+  private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+  private final String _segmentZKMetadataPathPrefix;
+
+  private final Map<String, Long> _segmentTotalDocsMap = new HashMap<>();
+  private final Set<String> _emptySegments = new HashSet<>();
+
+  public EmptySegmentPruner(TableConfig tableConfig, ZkHelixPropertyStore<ZNRecord> propertyStore) {
+    _tableNameWithType = tableConfig.getTableName();
+    _propertyStore = propertyStore;
+    _segmentZKMetadataPathPrefix = ZKMetadataProvider.constructPropertyStorePathForResource(_tableNameWithType) + "/";
+  }
+
+  @Override
+  public void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
+    // Bulk load info for all online segments
+    int numSegments = onlineSegments.size();
+    List<String> segments = new ArrayList<>(numSegments);
+    List<String> segmentZKMetadataPaths = new ArrayList<>(numSegments);
+    for (String segment : onlineSegments) {
+      segments.add(segment);
+      segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
+    }
+    List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT, false);
+    for (int i = 0; i < numSegments; i++) {
+      String segment = segments.get(i);
+      long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment, znRecords.get(i));
+      _segmentTotalDocsMap.put(segment, totalDocs);
+      if (totalDocs == 0) {
+        _emptySegments.add(segment);
+      }
+    }
+  }
+
+  private long extractTotalDocsFromSegmentZKMetaZNRecord(String segment, @Nullable ZNRecord znRecord) {
+    if (znRecord == null) {
+      LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _tableNameWithType);
+      return -1;
+    }
+    return znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
+  }
+
+  @Override
+  public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
+      Set<String> onlineSegments) {
+    // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
+    //       ones. The refreshed segment ZK metadata change won't be picked up.
+    for (String segment : onlineSegments) {
+      _segmentTotalDocsMap.computeIfAbsent(segment, k -> {
+        long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(k,
+            _propertyStore.get(_segmentZKMetadataPathPrefix + k, null, AccessOption.PERSISTENT));
+        if (totalDocs == 0) {
+          _emptySegments.add(segment);
+        }
+        return totalDocs;
+      });
+    }
+    _segmentTotalDocsMap.keySet().retainAll(onlineSegments);
+    _emptySegments.retainAll(onlineSegments);
+  }
+
+  @Override
+  public synchronized void refreshSegment(String segment) {
+    long totalDocs = extractTotalDocsFromSegmentZKMetaZNRecord(segment,
+        _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT));
+    _segmentTotalDocsMap.put(segment, totalDocs);
+    if (totalDocs == 0) {
+      _emptySegments.add(segment);
+    } else {
+      _emptySegments.remove(segment);
+    }
+  }
+
+  /**
+   * Prune out segments which are empty
+   */
+  @Override
+  public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
+    Set<String> selectedSegments = new HashSet<>(segments);
+    selectedSegments.removeAll(_emptySegments);
+    return selectedSegments;
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
index 41137e9..2a61927 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerFactory.java
@@ -47,26 +47,30 @@ public class SegmentPrunerFactory {
   public static List<SegmentPruner> getSegmentPruners(TableConfig tableConfig,
       ZkHelixPropertyStore<ZNRecord> propertyStore) {
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
+    List<SegmentPruner> segmentPruners = new ArrayList<>();
+    // Always prune out empty segments first
+    segmentPruners.add(new EmptySegmentPruner(tableConfig, propertyStore));
+
     if (routingConfig != null) {
       List<String> segmentPrunerTypes = routingConfig.getSegmentPrunerTypes();
       if (segmentPrunerTypes != null) {
-        List<SegmentPruner> segmentPruners = new ArrayList<>(segmentPrunerTypes.size());
+        List<SegmentPruner> configuredSegmentPruners = new ArrayList<>(segmentPrunerTypes.size());
         for (String segmentPrunerType : segmentPrunerTypes) {
           if (RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
             PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
             if (partitionSegmentPruner != null) {
-              segmentPruners.add(partitionSegmentPruner);
+              configuredSegmentPruners.add(partitionSegmentPruner);
             }
           }
 
           if (RoutingConfig.TIME_SEGMENT_PRUNER_TYPE.equalsIgnoreCase(segmentPrunerType)) {
             TimeSegmentPruner timeSegmentPruner = getTimeSegmentPruner(tableConfig, propertyStore);
             if (timeSegmentPruner != null) {
-              segmentPruners.add(timeSegmentPruner);
+              configuredSegmentPruners.add(timeSegmentPruner);
             }
           }
         }
-        return sortSegmentPruners(segmentPruners);
+        segmentPruners.addAll(sortSegmentPruners(configuredSegmentPruners));
       } else {
         // Handle legacy configs for backward-compatibility
         TableType tableType = tableConfig.getTableType();
@@ -76,12 +80,12 @@ public class SegmentPrunerFactory {
             && LEGACY_PARTITION_AWARE_REALTIME_ROUTING.equalsIgnoreCase(routingTableBuilderName))) {
           PartitionSegmentPruner partitionSegmentPruner = getPartitionSegmentPruner(tableConfig, propertyStore);
           if (partitionSegmentPruner != null) {
-            return Collections.singletonList(getPartitionSegmentPruner(tableConfig, propertyStore));
+            segmentPruners.add(getPartitionSegmentPruner(tableConfig, propertyStore));
           }
         }
       }
     }
-    return Collections.emptyList();
+    return segmentPruners;
   }
 
   @Nullable
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 613b755..ddd5dbb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -125,15 +125,18 @@ public class TimeBoundaryManager {
       LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _offlineTableName);
       return INVALID_END_TIME_MS;
     }
-
-    long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
-    if (endTime > 0) {
-      TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
-      return timeUnit.toMillis(endTime);
-    } else {
-      LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
-      return INVALID_END_TIME_MS;
+    long totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
+    long endTimeMs = INVALID_END_TIME_MS;
+    if (totalDocs != 0) {
+      long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+      if (endTime > 0) {
+        TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
+        endTimeMs = timeUnit.toMillis(endTime);
+      } else {
+        LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
+      }
     }
+    return endTimeMs;
   }
 
   private void updateTimeBoundaryInfo(long maxEndTimeMs) {
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index da3744a..a639709 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -117,49 +117,64 @@ public class SegmentPrunerTest {
     when(tableConfig.getIndexingConfig()).thenReturn(indexingConfig);
 
     // Routing config is missing
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Segment pruner type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Segment partition config is missing
     when(routingConfig.getSegmentPrunerTypes())
         .thenReturn(Collections.singletonList(RoutingConfig.PARTITION_SEGMENT_PRUNER_TYPE));
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Column partition config is missing
     Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new HashMap<>();
     when(indexingConfig.getSegmentPartitionConfig()).thenReturn(new SegmentPartitionConfig(columnPartitionConfigMap));
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Partition-aware segment pruner should be returned
     columnPartitionConfigMap.put(PARTITION_COLUMN, new ColumnPartitionConfig("Modulo", 5));
-    List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 2);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
 
     // Do not allow multiple partition columns
     columnPartitionConfigMap.put("anotherPartitionColumn", new ColumnPartitionConfig("Modulo", 5));
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Should be backward-compatible with legacy config
     columnPartitionConfigMap.remove("anotherPartitionColumn");
     when(routingConfig.getSegmentPrunerTypes()).thenReturn(null);
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
     when(routingConfig.getRoutingTableBuilderName())
         .thenReturn(SegmentPrunerFactory.LEGACY_PARTITION_AWARE_OFFLINE_ROUTING);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
+    assertEquals(segmentPruners.size(), 2);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
     when(tableConfig.getTableType()).thenReturn(TableType.REALTIME);
     when(routingConfig.getRoutingTableBuilderName())
         .thenReturn(SegmentPrunerFactory.LEGACY_PARTITION_AWARE_REALTIME_ROUTING);
     segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof PartitionSegmentPruner);
+    assertEquals(segmentPruners.size(), 2);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertTrue(segmentPruners.get(1) instanceof PartitionSegmentPruner);
   }
 
   @Test
@@ -169,29 +184,37 @@ public class SegmentPrunerTest {
     setSchemaDateTimeFieldSpec(RAW_TABLE_NAME, TimeUnit.HOURS);
 
     // Routing config is missing
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Segment pruner type is not configured
     RoutingConfig routingConfig = mock(RoutingConfig.class);
     when(tableConfig.getRoutingConfig()).thenReturn(routingConfig);
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Validation config is missing
     when(routingConfig.getSegmentPrunerTypes())
         .thenReturn(Collections.singletonList(RoutingConfig.TIME_SEGMENT_PRUNER_TYPE));
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Time column is missing
     SegmentsValidationAndRetentionConfig validationConfig = mock(SegmentsValidationAndRetentionConfig.class);
     when(tableConfig.getValidationConfig()).thenReturn(validationConfig);
-    assertEquals(SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore), Collections.emptySet());
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 1);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
 
     // Time range pruner should be returned
     when(validationConfig.getTimeColumnName()).thenReturn(TIME_COLUMN);
-    List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
-    assertEquals(segmentPruners.size(), 1);
-    assertTrue(segmentPruners.get(0) instanceof TimeSegmentPruner);
-
+    segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
+    assertEquals(segmentPruners.size(), 2);
+    assertTrue(segmentPruners.get(0) instanceof EmptySegmentPruner);
+    assertTrue(segmentPruners.get(1) instanceof TimeSegmentPruner);
   }
 
   @Test
@@ -460,6 +483,128 @@ public class SegmentPrunerTest {
     assertEquals(segmentPruner.prune(brokerRequest5, onlineSegments), Collections.emptySet());
   }
 
+  @Test
+  public void testEmptySegmentPruner() {
+    CalciteSqlCompiler sqlCompiler = new CalciteSqlCompiler();
+    BrokerRequest brokerRequest1 = sqlCompiler.compileToBrokerRequest(QUERY_1);
+    BrokerRequest brokerRequest2 = sqlCompiler.compileToBrokerRequest(QUERY_2);
+    BrokerRequest brokerRequest3 = sqlCompiler.compileToBrokerRequest(QUERY_3);
+
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
+
+    TableConfig tableConfig = getTableConfig(RAW_TABLE_NAME, TableType.REALTIME);
+
+    // init with list of segments
+    EmptySegmentPruner segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore);
+    Set<String> onlineSegments = new HashSet<>();
+    String segment0 = "segment0";
+    onlineSegments.add(segment0);
+    setRealtimeSegmentZKTotalDocsMetadata(segment0, 10);
+    String segment1 = "segment1";
+    onlineSegments.add(segment1);
+    setRealtimeSegmentZKTotalDocsMetadata(segment1, 0);
+    segmentPruner.init(externalView, idealState, onlineSegments);
+    assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Collections.singletonList(segment0)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Collections.singletonList(segment0)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1))),
+        new HashSet<>(Collections.singletonList(segment0)));
+
+    // init with empty list of segments
+    segmentPruner = new EmptySegmentPruner(tableConfig, _propertyStore);
+    onlineSegments.clear();
+    segmentPruner.init(externalView, idealState, onlineSegments);
+    assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptySet()), Collections.emptySet());
+    assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptySet()), Collections.emptySet());
+    assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptySet()), Collections.emptySet());
+
+    // Segments without metadata (not updated yet) should not be pruned
+    String newSegment = "newSegment";
+    onlineSegments.add(newSegment);
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+    assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(newSegment))),
+        Collections.singletonList(newSegment));
+    assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Collections.singletonList(newSegment))),
+        Collections.singletonList(newSegment));
+    assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Collections.singletonList(newSegment))),
+        Collections.singletonList(newSegment));
+
+    // Segments without totalDocs metadata should not be pruned
+    onlineSegments.clear();
+    String segmentWithoutTotalDocsMetadata = "segmentWithoutTotalDocsMetadata";
+    onlineSegments.add(segmentWithoutTotalDocsMetadata);
+    RealtimeSegmentZKMetadata segmentZKMetadataWithoutTotalDocsMetadata = new RealtimeSegmentZKMetadata();
+    segmentZKMetadataWithoutTotalDocsMetadata.setSegmentName(segmentWithoutTotalDocsMetadata);
+    segmentZKMetadataWithoutTotalDocsMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    ZKMetadataProvider
+        .setRealtimeSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, segmentZKMetadataWithoutTotalDocsMetadata);
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+    assertEquals(
+        segmentPruner.prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithoutTotalDocsMetadata))),
+        Collections.singletonList(segmentWithoutTotalDocsMetadata));
+    assertEquals(
+        segmentPruner.prune(brokerRequest2, new HashSet<>(Collections.singletonList(segmentWithoutTotalDocsMetadata))),
+        Collections.singletonList(segmentWithoutTotalDocsMetadata));
+    assertEquals(
+        segmentPruner.prune(brokerRequest3, new HashSet<>(Collections.singletonList(segmentWithoutTotalDocsMetadata))),
+        Collections.singletonList(segmentWithoutTotalDocsMetadata));
+
+    // Segments with -1 totalDocs should not be pruned
+    onlineSegments.clear();
+    String segmentWithNegativeTotalDocsMetadata = "segmentWithNegativeTotalDocsMetadata";
+    onlineSegments.add(segmentWithNegativeTotalDocsMetadata);
+    setRealtimeSegmentZKTotalDocsMetadata(segmentWithNegativeTotalDocsMetadata, -1);
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+    assertEquals(segmentPruner
+            .prune(brokerRequest1, new HashSet<>(Collections.singletonList(segmentWithNegativeTotalDocsMetadata))),
+        Collections.singletonList(segmentWithNegativeTotalDocsMetadata));
+    assertEquals(segmentPruner
+            .prune(brokerRequest2, new HashSet<>(Collections.singletonList(segmentWithNegativeTotalDocsMetadata))),
+        Collections.singletonList(segmentWithNegativeTotalDocsMetadata));
+    assertEquals(segmentPruner
+            .prune(brokerRequest3, new HashSet<>(Collections.singletonList(segmentWithNegativeTotalDocsMetadata))),
+        Collections.singletonList(segmentWithNegativeTotalDocsMetadata));
+
+    // Prune segments with 0 total docs
+    onlineSegments.clear();
+    onlineSegments.add(segment0);
+    setRealtimeSegmentZKTotalDocsMetadata(segment0, 10);
+    onlineSegments.add(segment1);
+    setRealtimeSegmentZKTotalDocsMetadata(segment1, 0);
+    String segment2 = "segment2";
+    onlineSegments.add(segment2);
+    setRealtimeSegmentZKTotalDocsMetadata(segment2, -1);
+
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
+    assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Arrays.asList(segment0, segment2)));
+
+    // Update metadata without external view change or refreshing should have no effect
+    setSegmentZKTimeRangeMetadata(segment2, 20, 30, TimeUnit.DAYS);
+    setRealtimeSegmentZKTotalDocsMetadata(segment2, 0);
+    assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Arrays.asList(segment0, segment2)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Arrays.asList(segment0, segment2)));
+
+    // Refresh the changed segment should update the segment pruner
+    segmentPruner.refreshSegment(segment2);
+    assertEquals(segmentPruner.prune(brokerRequest1, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Collections.singletonList(segment0)));
+    assertEquals(segmentPruner.prune(brokerRequest2, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Collections.singletonList(segment0)));
+    assertEquals(segmentPruner.prune(brokerRequest3, new HashSet<>(Arrays.asList(segment0, segment1, segment2))),
+        new HashSet<>(Collections.singletonList(segment0)));
+  }
+
   private TableConfig getTableConfig(String rawTableName, TableType type) {
     return new TableConfigBuilder(type).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN).build();
   }
@@ -493,4 +638,12 @@ public class SegmentPrunerTest {
     realtimeSegmentZKMetadata.setTimeUnit(unit);
     ZKMetadataProvider.setRealtimeSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, realtimeSegmentZKMetadata);
   }
+
+  private void setRealtimeSegmentZKTotalDocsMetadata(String segment, long totalDocs) {
+    RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new RealtimeSegmentZKMetadata();
+    realtimeSegmentZKMetadata.setSegmentName(segment);
+    realtimeSegmentZKMetadata.setTotalDocs(totalDocs);
+    realtimeSegmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.IN_PROGRESS);
+    ZKMetadataProvider.setRealtimeSegmentZKMetadata(_propertyStore, REALTIME_TABLE_NAME, realtimeSegmentZKMetadata);
+  }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 00ca6c7..345968a 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -125,6 +125,13 @@ public class TimeBoundaryManagerTest {
     timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
 
+    // Add new segment with larger end time but 0 total docs, should not update time boundary
+    String segmentEmpty = "segmentEmpty";
+    onlineSegments.add(segmentEmpty);
+    setSegmentZKMetadataWithTotalDocs(rawTableName, segmentEmpty, 6, timeUnit, 0);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
+
     // Add a new segment with smaller end time should not change the time boundary
     String segment2 = "segment2";
     onlineSegments.add(segment2);
@@ -194,6 +201,18 @@ public class TimeBoundaryManagerTest {
             offlineSegmentZKMetadata);
   }
 
+  private void setSegmentZKMetadataWithTotalDocs(String rawTableName, String segment, int endTimeInDays,
+      TimeUnit timeUnit, long totalDocs) {
+    OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
+    offlineSegmentZKMetadata.setSegmentName(segment);
+    offlineSegmentZKMetadata.setEndTime(timeUnit.convert(endTimeInDays, TimeUnit.DAYS));
+    offlineSegmentZKMetadata.setTimeUnit(timeUnit);
+    offlineSegmentZKMetadata.setTotalDocs(totalDocs);
+    ZKMetadataProvider
+        .setOfflineSegmentZKMetadata(_propertyStore, TableNameBuilder.OFFLINE.tableNameWithType(rawTableName),
+            offlineSegmentZKMetadata);
+  }
+
   private void verifyTimeBoundaryInfo(TimeBoundaryInfo timeBoundaryInfo, long expectedTimeValue) {
     assertNotNull(timeBoundaryInfo);
     assertEquals(timeBoundaryInfo.getTimeColumn(), TIME_COLUMN);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 42d50d0..ece2c40 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -507,10 +507,17 @@ public class PinotLLCRealtimeSegmentManager {
     committingSegmentZKMetadata.setDownloadUrl(isPeerURL(committingSegmentDescriptor.getSegmentLocation())
         ? CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD : committingSegmentDescriptor.getSegmentLocation());
     committingSegmentZKMetadata.setCrc(Long.valueOf(segmentMetadata.getCrc()));
-    Preconditions.checkNotNull(segmentMetadata.getTimeInterval(),
-        "start/end time information is not correctly written to the segment for table: " + realtimeTableName);
-    committingSegmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
-    committingSegmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
+    if (segmentMetadata.getTotalDocs() > 0) {
+      Preconditions.checkNotNull(segmentMetadata.getTimeInterval(),
+          "start/end time information is not correctly written to the segment for table: " + realtimeTableName);
+      committingSegmentZKMetadata.setStartTime(segmentMetadata.getTimeInterval().getStartMillis());
+      committingSegmentZKMetadata.setEndTime(segmentMetadata.getTimeInterval().getEndMillis());
+    } else {
+      // Set current time as start/end time if total docs is 0
+      long now = System.currentTimeMillis();
+      committingSegmentZKMetadata.setStartTime(now);
+      committingSegmentZKMetadata.setEndTime(now);
+    }
     committingSegmentZKMetadata.setTimeUnit(TimeUnit.MILLISECONDS);
     committingSegmentZKMetadata.setIndexVersion(segmentMetadata.getVersion());
     committingSegmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
index a2e27c1..9b99813 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java
@@ -53,12 +53,12 @@ public class SegmentMetadataMockUtils {
 
   public static SegmentMetadata mockSegmentMetadata(String tableName) {
     String uniqueNumericString = Long.toString(System.nanoTime());
-    return mockSegmentMetadata(tableName, tableName + uniqueNumericString, 0, uniqueNumericString);
+    return mockSegmentMetadata(tableName, tableName + uniqueNumericString, 100, uniqueNumericString);
   }
 
   public static SegmentMetadata mockSegmentMetadata(String tableName, String segmentName) {
     String uniqueNumericString = Long.toString(System.nanoTime());
-    return mockSegmentMetadata(tableName, segmentName, 0, uniqueNumericString);
+    return mockSegmentMetadata(tableName, segmentName, 100, uniqueNumericString);
   }
 
   public static RealtimeSegmentZKMetadata mockRealtimeSegmentZKMetadata(String tableName, String segmentName,
@@ -90,7 +90,7 @@ public class SegmentMetadataMockUtils {
     SegmentMetadata segmentMetadata = Mockito.mock(SegmentMetadata.class);
     Mockito.when(segmentMetadata.getTableName()).thenReturn(tableName);
     Mockito.when(segmentMetadata.getName()).thenReturn(segmentName);
-    Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(0);
+    Mockito.when(segmentMetadata.getTotalDocs()).thenReturn(10);
     Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
     Mockito.when(segmentMetadata.getPushTime()).thenReturn(Long.MIN_VALUE);
     Mockito.when(segmentMetadata.getRefreshTime()).thenReturn(Long.MIN_VALUE);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index 312cda7..c3c2db6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -75,34 +76,37 @@ public class PinotSegmentRecordReader implements RecordReader {
     try {
       SegmentMetadata segmentMetadata = _immutableSegment.getSegmentMetadata();
       _numDocs = segmentMetadata.getTotalDocs();
-      if (schema == null) {
-        // In order not to expose virtual columns to client, schema shouldn't be fetched from segmentMetadata;
-        // otherwise the original metadata will be modified. Hence, initialize a new schema.
-        _schema = new SegmentMetadataImpl(indexDir).getSchema();
-        Collection<String> columnNames = _schema.getColumnNames();
-        _columnReaderMap = new HashMap<>(columnNames.size());
-        for (String columnName : columnNames) {
-          _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+      // In order not to expose virtual columns to client, schema shouldn't be fetched from segmentMetadata;
+      // otherwise the original metadata will be modified. Hence, initialize a new schema.
+      _schema = schema == null ? new SegmentMetadataImpl(indexDir).getSchema() : schema;
+      if (_numDocs > 0) {
+        _columnReaderMap = new HashMap<>();
+        if (schema == null) {
+          Collection<String> columnNames = _schema.getColumnNames();
+          for (String columnName : columnNames) {
+            _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+          }
+        } else {
+          Schema segmentSchema = segmentMetadata.getSchema();
+          Collection<FieldSpec> fieldSpecs = _schema.getAllFieldSpecs();
+          for (FieldSpec fieldSpec : fieldSpecs) {
+            String columnName = fieldSpec.getName();
+            FieldSpec segmentFieldSpec = segmentSchema.getFieldSpecFor(columnName);
+            Preconditions.checkState(fieldSpec.equals(segmentFieldSpec),
+                "Field spec mismatch for column: %s, in the given schema: %s, in the segment schema: %s", columnName,
+                fieldSpec, segmentFieldSpec);
+            _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+          }
         }
-      } else {
-        _schema = schema;
-        Schema segmentSchema = segmentMetadata.getSchema();
-        Collection<FieldSpec> fieldSpecs = _schema.getAllFieldSpecs();
-        _columnReaderMap = new HashMap<>(fieldSpecs.size());
-        for (FieldSpec fieldSpec : fieldSpecs) {
-          String columnName = fieldSpec.getName();
-          FieldSpec segmentFieldSpec = segmentSchema.getFieldSpecFor(columnName);
-          Preconditions.checkState(fieldSpec.equals(segmentFieldSpec),
-              "Field spec mismatch for column: %s, in the given schema: %s, in the segment schema: %s", columnName,
-              fieldSpec, segmentFieldSpec);
-          _columnReaderMap.put(columnName, new PinotSegmentColumnReader(_immutableSegment, columnName));
+        // Initialize sorted doc ids
+        if (sortOrder != null && !sortOrder.isEmpty()) {
+          _docIdsInSortedColumnOrder =
+              new PinotSegmentSorter(_numDocs, _schema, _columnReaderMap).getSortedDocIds(sortOrder);
+        } else {
+          _docIdsInSortedColumnOrder = null;
         }
-      }
-      // Initialize sorted doc ids
-      if (sortOrder != null && !sortOrder.isEmpty()) {
-        _docIdsInSortedColumnOrder =
-            new PinotSegmentSorter(_numDocs, _schema, _columnReaderMap).getSortedDocIds(sortOrder);
       } else {
+        _columnReaderMap = Collections.emptyMap();
         _docIdsInSortedColumnOrder = null;
       }
     } catch (Exception e) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/EmptyIndexSegment.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/EmptyIndexSegment.java
new file mode 100644
index 0000000..a11dc46
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/EmptyIndexSegment.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.indexsegment.immutable;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.segment.index.datasource.EmptyDataSource;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.segment.index.readers.ForwardIndexReader;
+import org.apache.pinot.core.segment.index.readers.InvertedIndexReader;
+import org.apache.pinot.core.segment.index.readers.ValidDocIndexReader;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Immutable segment impl for empty segment
+ * Such an IndexSegment contains only the metadata, and no indexes
+ */
+public class EmptyIndexSegment implements ImmutableSegment {
+  private static final Logger LOGGER = LoggerFactory.getLogger(EmptyIndexSegment.class);
+
+  private final SegmentMetadataImpl _segmentMetadata;
+
+  public EmptyIndexSegment(SegmentMetadataImpl segmentMetadata) {
+    _segmentMetadata = segmentMetadata;
+  }
+
+  @Override
+  public String getSegmentName() {
+    return _segmentMetadata.getName();
+  }
+
+  @Override
+  public SegmentMetadataImpl getSegmentMetadata() {
+    return _segmentMetadata;
+  }
+
+  @Override
+  public DataSource getDataSource(String column) {
+    ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column);
+    Preconditions.checkNotNull(columnMetadata,
+        "ColumnMetadata for " + column + " should not be null. " + "Potentially invalid column name specified.");
+    return new EmptyDataSource(columnMetadata);
+  }
+
+  @Override
+  public Set<String> getColumnNames() {
+    return _segmentMetadata.getSchema().getColumnNames();
+  }
+
+  @Override
+  public Set<String> getPhysicalColumnNames() {
+    return _segmentMetadata.getSchema().getPhysicalColumnNames();
+  }
+
+  @Override
+  public void destroy() {
+  }
+
+  @Override
+  public List<StarTreeV2> getStarTrees() {
+    return null;
+  }
+
+  @Nullable
+  @Override
+  public ValidDocIndexReader getValidDocIndex() {
+    return null;
+  }
+
+  @Override
+  public GenericRow getRecord(int docId, GenericRow reuse) {
+    // NOTE: Use PinotSegmentRecordReader to read immutable segment
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Dictionary getDictionary(String column) {
+    return null;
+  }
+
+  @Override
+  public ForwardIndexReader getForwardIndex(String column) {
+    return null;
+  }
+
+  @Override
+  public InvertedIndexReader getInvertedIndex(String column) {
+    return null;
+  }
+
+  @Override
+  public long getSegmentSizeBytes() {
+    return 0;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
index b7b1fe0..59dec61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/immutable/ImmutableSegmentLoader.java
@@ -101,6 +101,9 @@ public class ImmutableSegmentLoader {
 
     // Load the metadata again since converter and pre-processor may have changed it
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(indexDir);
+    if (segmentMetadata.getTotalDocs() == 0) {
+      return new EmptyIndexSegment(segmentMetadata);
+    }
 
     // Load the segment
     ReadMode readMode = indexLoadingConfig.getReadMode();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index be9f092..ff7dc01 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.query.config.SegmentPrunerConfig;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.env.PinotConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,6 +31,7 @@ import org.slf4j.LoggerFactory;
 /**
  * The <code>SegmentPrunerService</code> class contains multiple segment pruners and provides service to prune segments
  * against all pruners.
+ * {@link ValidSegmentPruner} is always set as the first pruner
  */
 public class SegmentPrunerService {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPrunerService.class);
@@ -38,11 +40,18 @@ public class SegmentPrunerService {
 
   public SegmentPrunerService(SegmentPrunerConfig config) {
     int numPruners = config.numSegmentPruners();
-    _segmentPruners = new ArrayList<>(numPruners);
+    _segmentPruners = new ArrayList<>(numPruners + 1);
+
+    String validSegmentPrunerName = ValidSegmentPruner.class.getSimpleName();
+    _segmentPruners.add(SegmentPrunerProvider.getSegmentPruner(validSegmentPrunerName, new PinotConfiguration()));
+
     for (int i = 0; i < numPruners; i++) {
-      LOGGER.info("Adding segment pruner: " + config.getSegmentPrunerName(i));
-      _segmentPruners.add(
-          SegmentPrunerProvider.getSegmentPruner(config.getSegmentPrunerName(i), config.getSegmentPrunerConfig(i)));
+      String segmentPrunerName = config.getSegmentPrunerName(i);
+      if (!validSegmentPrunerName.equalsIgnoreCase(segmentPrunerName)) {
+        LOGGER.info("Adding segment pruner: " + segmentPrunerName);
+        _segmentPruners
+            .add(SegmentPrunerProvider.getSegmentPruner(segmentPrunerName, config.getSegmentPrunerConfig(i)));
+      }
     }
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index 1be48c4..cc1750e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -40,7 +40,8 @@ public class RealtimeSegmentRecordReader implements RecordReader {
   public RealtimeSegmentRecordReader(MutableSegmentImpl realtimeSegment, @Nullable String sortedColumn) {
     _realtimeSegment = realtimeSegment;
     _numDocs = realtimeSegment.getNumDocsIndexed();
-    _sortedDocIdIterationOrder = sortedColumn != null ? realtimeSegment.getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) : null;
+    _sortedDocIdIterationOrder = sortedColumn != null && realtimeSegment.getNumDocsIndexed() > 0 ? realtimeSegment
+        .getSortedDocIdIterationOrderWithSortedColumn(sortedColumn) : null;
   }
 
   public int[] getSortedDocIdIterationOrder() {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index f0dc321..9041632 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -127,6 +127,9 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
 
     this.schema = schema;
     this.totalDocs = segmentIndexCreationInfo.getTotalDocs();
+    if (totalDocs == 0) {
+      return;
+    }
 
     Collection<FieldSpec> fieldSpecs = schema.getAllFieldSpecs();
     Set<String> invertedIndexColumns = new HashSet<>();
@@ -544,20 +547,34 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
           endTime = Long.parseLong(config.getEndTime());
           timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
         } else {
-          String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
-          String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
-
-          if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
-            // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
-            DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(config.getSimpleDateFormat());
-            startTime = dateTimeFormatter.parseMillis(startTimeStr);
-            endTime = dateTimeFormatter.parseMillis(endTimeStr);
-            timeUnit = TimeUnit.MILLISECONDS;
+          if (totalDocs > 0) {
+            String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
+            String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
+
+            if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+              // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
+              DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(config.getSimpleDateFormat());
+              startTime = dateTimeFormatter.parseMillis(startTimeStr);
+              endTime = dateTimeFormatter.parseMillis(endTimeStr);
+              timeUnit = TimeUnit.MILLISECONDS;
+            } else {
+              // by default, time column type is TimeColumnType.EPOCH
+              startTime = Long.parseLong(startTimeStr);
+              endTime = Long.parseLong(endTimeStr);
+              timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+            }
           } else {
-            // by default, time column type is TimeColumnType.EPOCH
-            startTime = Long.parseLong(startTimeStr);
-            endTime = Long.parseLong(endTimeStr);
-            timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+            // No records in segment. Use current time as start/end
+            long now = System.currentTimeMillis();
+            if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+              startTime = now;
+              endTime = now;
+              timeUnit = TimeUnit.MILLISECONDS;
+            } else {
+              timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+              startTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+              endTime = timeUnit.convert(now, TimeUnit.MILLISECONDS);
+            }
           }
         }
 
@@ -656,10 +673,12 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     }
 
     // NOTE: Min/max could be null for real-time aggregate metrics.
-    Object min = columnIndexCreationInfo.getMin();
-    Object max = columnIndexCreationInfo.getMax();
-    if (min != null && max != null) {
-      addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString());
+    if (totalDocs > 0) {
+      Object min = columnIndexCreationInfo.getMin();
+      Object max = columnIndexCreationInfo.getMax();
+      if (min != null && max != null) {
+        addColumnMinMaxValueInfo(properties, column, min.toString(), max.toString());
+      }
     }
 
     String defaultNullValue = columnIndexCreationInfo.getDefaultNullValue().toString();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 3078dc7..19ec420 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -145,7 +145,6 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
       throws Exception {
     this.config = config;
     recordReader = dataSource.getRecordReader();
-    Preconditions.checkState(recordReader.hasNext(), "No record in data source");
     dataSchema = config.getSchema();
 
     _recordTransformer = recordTransformer;
@@ -240,8 +239,14 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
     ColumnStatistics timeColumnStatistics = segmentStats.getColumnProfileFor(config.getTimeColumnName());
     int sequenceId = config.getSequenceId();
     if (timeColumnStatistics != null) {
-      segmentName = config.getSegmentNameGenerator()
-          .generateSegmentName(sequenceId, timeColumnStatistics.getMinValue(), timeColumnStatistics.getMaxValue());
+      if (totalDocs > 0) {
+        segmentName = config.getSegmentNameGenerator()
+            .generateSegmentName(sequenceId, timeColumnStatistics.getMinValue(), timeColumnStatistics.getMaxValue());
+      } else {
+        // Generate a unique name for a segment with no rows
+        long now = System.currentTimeMillis();
+        segmentName = config.getSegmentNameGenerator().generateSegmentName(sequenceId, now, now);
+      }
     } else {
       segmentName = config.getSegmentNameGenerator().generateSegmentName(sequenceId, null, null);
     }
@@ -272,7 +277,9 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
     convertFormatIfNecessary(segmentOutputDir);
 
     // Build star-tree V2 if necessary
-    buildStarTreeV2IfNecessary(segmentOutputDir);
+    if (totalDocs > 0) {
+      buildStarTreeV2IfNecessary(segmentOutputDir);
+    }
 
     // Compute CRC and creation time
     long crc = CrcUtils.forAllFilesInFolder(segmentOutputDir).computeCrc();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/EmptyDataSource.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/EmptyDataSource.java
new file mode 100644
index 0000000..8eaf1d8
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/EmptyDataSource.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.datasource;
+
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.common.DataSourceMetadata;
+import org.apache.pinot.core.data.partition.PartitionFunction;
+import org.apache.pinot.core.segment.index.metadata.ColumnMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+
+
+/**
+ * The {@code EmptyImmutableDataSource} class is the data source for a column in the immutable segment with 0 rows.
+ */
+public class EmptyDataSource extends BaseDataSource {
+
+  public EmptyDataSource(ColumnMetadata columnMetadata) {
+    super(new EmptyDataSourceMetadata(columnMetadata), null, null, null, null, null, null, null, null, null, null);
+  }
+
+  private static class EmptyDataSourceMetadata implements DataSourceMetadata {
+    final FieldSpec _fieldSpec;
+
+    EmptyDataSourceMetadata(ColumnMetadata columnMetadata) {
+      _fieldSpec = columnMetadata.getFieldSpec();
+    }
+
+    @Override
+    public FieldSpec getFieldSpec() {
+      return _fieldSpec;
+    }
+
+    @Override
+    public boolean isSorted() {
+      return false;
+    }
+
+    @Override
+    public int getNumDocs() {
+      return 0;
+    }
+
+    @Override
+    public int getNumValues() {
+      return 0;
+    }
+
+    @Override
+    public int getMaxNumValuesPerMVEntry() {
+      return -1;
+    }
+
+    @Nullable
+    @Override
+    public Comparable getMinValue() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public Comparable getMaxValue() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public PartitionFunction getPartitionFunction() {
+      return null;
+    }
+
+    @Nullable
+    @Override
+    public Set<Integer> getPartitions() {
+      return null;
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index acfeae3..1055fd7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -51,6 +51,9 @@ public class RecordReaderSampleDataTest {
   private static final File JSON_SAMPLE_DATA_FILE = new File(Preconditions
       .checkNotNull(RecordReaderSampleDataTest.class.getClassLoader().getResource("data/test_sample_data.json"))
       .getFile());
+  private static final File JSON_EMPTY_DATA_FILE = new File(Preconditions
+      .checkNotNull(RecordReaderSampleDataTest.class.getClassLoader().getResource("data/test_empty_data.json"))
+      .getFile());
   private final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension("column1", FieldSpec.DataType.LONG)
       .addSingleValueDimension("column2", FieldSpec.DataType.INT)
       .addSingleValueDimension("column3", FieldSpec.DataType.STRING)
@@ -106,6 +109,15 @@ public class RecordReaderSampleDataTest {
     }
   }
 
+  @Test
+  public void testRecordReaderEmptyFile()
+      throws Exception {
+    try (RecordReader jsonRecordReader = RecordReaderFactory
+        .getRecordReader(FileFormat.JSON, JSON_EMPTY_DATA_FILE, SCHEMA.getColumnNames(), null)) {
+      Assert.assertFalse(jsonRecordReader.hasNext());
+    }
+  }
+
   /**
    * Tests that record extractor is able to handle missing fields correctly (incoming and outgoing are missing from data)
    * @throws Exception
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNoRecordsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNoRecordsTest.java
new file mode 100644
index 0000000..e1b682d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithNoRecordsTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.index.creator;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.core.data.readers.GenericRowRecordReader;
+import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.core.segment.store.SegmentDirectory;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests segment generation for empty files
+ */
+public class SegmentGenerationWithNoRecordsTest {
+  private static final String STRING_COLUMN1 = "string_col1";
+  private static final String STRING_COLUMN2 = "string_col2";
+  private static final String STRING_COLUMN3 = "string_col3";
+  private static final String STRING_COLUMN4 = "string_col4";
+  private static final String LONG_COLUMN1 = "long_col1";
+  private static final String LONG_COLUMN2 = "long_col2";
+  private static final String LONG_COLUMN3 = "long_col3";
+  private static final String LONG_COLUMN4 = "long_col4";
+  private static final String MV_INT_COLUMN = "mv_col";
+  private static final String DATE_TIME_COLUMN = "date_time_col";
+  private static final String SEGMENT_DIR_NAME =
+      FileUtils.getTempDirectoryPath() + File.separator + "segmentNoRecordsTest";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private Schema _schema;
+  private TableConfig _tableConfig;
+
+  @BeforeClass
+  public void setup() {
+    _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+        .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
+        .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+        .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2)).setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+        .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
+    _tableConfig.getIndexingConfig().setEnableDefaultStarTree(true);
+    _schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+        .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+        .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+        .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+  }
+
+  @BeforeMethod
+  public void reset() {
+    FileUtils.deleteQuietly(new File(SEGMENT_DIR_NAME));
+  }
+
+  @Test
+  public void testNumDocs()
+      throws Exception {
+    File segmentDir = buildSegment(_tableConfig, _schema);
+    SegmentMetadataImpl metadata = SegmentDirectory.loadSegmentMetadata(segmentDir);
+    Assert.assertEquals(metadata.getTotalDocs(), 0);
+    Assert.assertEquals(metadata.getTimeColumn(), DATE_TIME_COLUMN);
+    Assert.assertEquals(metadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+    Assert.assertEquals(metadata.getStartTime(), metadata.getEndTime());
+    Assert.assertTrue(metadata.getAllColumns().containsAll(_schema.getColumnNames()));
+    PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader(segmentDir);
+    Assert.assertFalse(segmentRecordReader.hasNext());
+  }
+
+  private File buildSegment(final TableConfig tableConfig, final Schema schema)
+      throws Exception {
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema);
+    config.setOutDir(SEGMENT_DIR_NAME);
+    config.setSegmentName(SEGMENT_NAME);
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(config, new GenericRowRecordReader(Collections.emptyList()));
+    driver.build();
+    driver.getOutputDirectory().deleteOnExit();
+    return driver.getOutputDirectory();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
index c28a2d8..fa00339 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/executor/QueryExecutorTest.java
@@ -51,8 +51,13 @@ import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.IngestionSchemaValidator;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -63,10 +68,12 @@ import com.yammer.metrics.core.MetricsRegistry;
 
 public class QueryExecutorTest {
   private static final String AVRO_DATA_PATH = "data/simpleData200001.avro";
+  private static final String EMPTY_JSON_DATA_PATH = "data/test_empty_data.json";
   private static final String QUERY_EXECUTOR_CONFIG_PATH = "conf/query-executor.properties";
   private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "QueryExecutorTest");
   private static final String TABLE_NAME = "testTable";
   private static final int NUM_SEGMENTS_TO_GENERATE = 2;
+  private static final int NUM_EMPTY_SEGMENTS_TO_GENERATE = 2;
   private static final Pql2Compiler COMPILER = new Pql2Compiler();
   private static final ExecutorService QUERY_RUNNERS = Executors.newFixedThreadPool(20);
 
@@ -85,9 +92,12 @@ public class QueryExecutorTest {
     URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH);
     Assert.assertNotNull(resourceUrl);
     File avroFile = new File(resourceUrl.getFile());
-    for (int i = 0; i < NUM_SEGMENTS_TO_GENERATE; i++) {
+    Schema schema = SegmentTestUtils.extractSchemaFromAvroWithoutTime(avroFile);
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
+    int i = 0;
+    for (; i < NUM_SEGMENTS_TO_GENERATE; i++) {
       SegmentGeneratorConfig config =
-          SegmentTestUtils.getSegmentGeneratorConfigWithoutTimeColumn(avroFile, INDEX_DIR, TABLE_NAME);
+          SegmentTestUtils.getSegmentGeneratorConfig(avroFile, FileFormat.AVRO, INDEX_DIR, TABLE_NAME, tableConfig, schema);
       config.setSegmentNamePostfix(Integer.toString(i));
       SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
       driver.init(config);
@@ -100,6 +110,19 @@ public class QueryExecutorTest {
       _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
       _segmentNames.add(driver.getSegmentName());
     }
+    resourceUrl = getClass().getClassLoader().getResource(EMPTY_JSON_DATA_PATH);
+    Assert.assertNotNull(resourceUrl);
+    File jsonFile = new File(resourceUrl.getFile());
+    for (; i < NUM_SEGMENTS_TO_GENERATE + NUM_EMPTY_SEGMENTS_TO_GENERATE; i++) {
+      SegmentGeneratorConfig config =
+          SegmentTestUtils.getSegmentGeneratorConfig(jsonFile, FileFormat.JSON, INDEX_DIR, TABLE_NAME, tableConfig, schema);
+      config.setSegmentNamePostfix(Integer.toString(i));
+      SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+      driver.init(config);
+      driver.build();
+      _indexSegments.add(ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.mmap));
+      _segmentNames.add(driver.getSegmentName());
+    }
 
     // Mock the instance data manager
     _serverMetrics = new ServerMetrics(new MetricsRegistry());
diff --git a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
index 71f42fa..430f597 100644
--- a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -18,16 +18,26 @@
  */
 package org.apache.pinot.realtime.converter;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import java.io.File;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
+import org.apache.pinot.core.indexsegment.generator.SegmentVersion;
+import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
+import org.apache.pinot.core.io.writer.impl.DirectMemoryManager;
 import org.apache.pinot.core.realtime.converter.RealtimeSegmentConverter;
+import org.apache.pinot.core.realtime.impl.RealtimeSegmentConfig;
+import org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
+import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.testng.Assert;
@@ -36,6 +46,23 @@ import org.testng.annotations.Test;
 
 public class RealtimeSegmentConverterTest {
 
+  private static final String STRING_COLUMN1 = "string_col1";
+  private static final String STRING_COLUMN2 = "string_col2";
+  private static final String STRING_COLUMN3 = "string_col3";
+  private static final String STRING_COLUMN4 = "string_col4";
+  private static final String LONG_COLUMN1 = "long_col1";
+  private static final String LONG_COLUMN2 = "long_col2";
+  private static final String LONG_COLUMN3 = "long_col3";
+  private static final String LONG_COLUMN4 = "long_col4";
+  private static final String MV_INT_COLUMN = "mv_col";
+  private static final String DATE_TIME_COLUMN = "date_time_col";
+
+  private static final File TMP_DIR = new File(FileUtils.getTempDirectory(), RealtimeSegmentConverterTest.class.getName());
+
+  public void setup() {
+    Preconditions.checkState(TMP_DIR.mkdirs());
+  }
+
   @Test
   public void testNoVirtualColumnsInSchema() {
     Schema schema = new Schema.SchemaBuilder().addSingleValueDimension("col1", FieldSpec.DataType.STRING)
@@ -47,4 +74,74 @@ public class RealtimeSegmentConverterTest {
     Schema newSchema = RealtimeSegmentConverter.getUpdatedSchema(schema);
     Assert.assertEquals(newSchema.getColumnNames().size(), 2);
   }
+
+  @Test
+  public void testNoRecordsIndexed()
+      throws Exception {
+
+    File tmpDir = new File(TMP_DIR, "tmp_" + System.currentTimeMillis());
+    TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName(DATE_TIME_COLUMN)
+        .setInvertedIndexColumns(Lists.newArrayList(STRING_COLUMN1)).setSortedColumn(LONG_COLUMN1)
+        .setRangeIndexColumns(Lists.newArrayList(STRING_COLUMN2))
+        .setNoDictionaryColumns(Lists.newArrayList(LONG_COLUMN2)).setVarLengthDictionaryColumns(Lists.newArrayList(STRING_COLUMN3))
+        .setOnHeapDictionaryColumns(Lists.newArrayList(LONG_COLUMN3)).build();
+    Schema schema = new Schema.SchemaBuilder()
+        .addSingleValueDimension(STRING_COLUMN1, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN2, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN3, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(STRING_COLUMN4, FieldSpec.DataType.STRING)
+        .addSingleValueDimension(LONG_COLUMN1, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN2, FieldSpec.DataType.LONG)
+        .addSingleValueDimension(LONG_COLUMN3, FieldSpec.DataType.LONG)
+        .addMultiValueDimension(MV_INT_COLUMN, FieldSpec.DataType.INT)
+        .addMetric(LONG_COLUMN4, FieldSpec.DataType.LONG)
+        .addDateTime(DATE_TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS")
+        .build();
+
+    String tableNameWithType = tableConfig.getTableName();
+    String segmentName = "testTable__0__0__123456";
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+
+    RealtimeSegmentConfig.Builder realtimeSegmentConfigBuilder =
+        new RealtimeSegmentConfig.Builder().setTableNameWithType(tableNameWithType).setSegmentName(segmentName)
+            .setStreamName(tableNameWithType).setSchema(schema).setTimeColumnName(DATE_TIME_COLUMN)
+            .setCapacity(1000).setAvgNumMultiValues(3)
+            .setNoDictionaryColumns(Sets.newHashSet(LONG_COLUMN2))
+            .setVarLengthDictionaryColumns(Sets.newHashSet(STRING_COLUMN3))
+            .setInvertedIndexColumns(Sets.newHashSet(STRING_COLUMN1))
+            .setRealtimeSegmentZKMetadata(getRealtimeSegmentZKMetadata(segmentName))
+            .setOffHeap(true).setMemoryManager(new DirectMemoryManager(segmentName))
+            .setStatsHistory(RealtimeSegmentStatsHistory.deserialzeFrom(new File(tmpDir, "stats")))
+            .setConsumerDir(new File(tmpDir, "consumerDir").getAbsolutePath());
+
+    // create mutable segment impl
+    MutableSegmentImpl mutableSegmentImpl = new MutableSegmentImpl(realtimeSegmentConfigBuilder.build(), null);
+
+    File outputDir = new File(tmpDir, "outputDir");
+    RealtimeSegmentConverter converter =
+        new RealtimeSegmentConverter(mutableSegmentImpl, outputDir.getAbsolutePath(), schema,
+            tableNameWithType, tableConfig, segmentName, indexingConfig.getSortedColumn().get(0),
+            indexingConfig.getInvertedIndexColumns(), null, null, indexingConfig.getNoDictionaryColumns(),
+            indexingConfig.getVarLengthDictionaryColumns(), false);
+
+    converter.build(SegmentVersion.v3, null);
+    SegmentMetadataImpl metadata = new SegmentMetadataImpl(new File(outputDir, segmentName));
+    Assert.assertEquals(metadata.getTotalDocs(), 0);
+    Assert.assertEquals(metadata.getTimeColumn(), DATE_TIME_COLUMN);
+    Assert.assertEquals(metadata.getTimeUnit(), TimeUnit.MILLISECONDS);
+    Assert.assertEquals(metadata.getStartTime(), metadata.getEndTime());
+    Assert.assertTrue(metadata.getAllColumns().containsAll(schema.getColumnNames()));
+    System.out.println(outputDir);
+  }
+
+  private RealtimeSegmentZKMetadata getRealtimeSegmentZKMetadata(String segmentName) {
+    RealtimeSegmentZKMetadata realtimeSegmentZKMetadata = new RealtimeSegmentZKMetadata();
+    realtimeSegmentZKMetadata.setCreationTime(System.currentTimeMillis());
+    realtimeSegmentZKMetadata.setSegmentName(segmentName);
+    return realtimeSegmentZKMetadata;
+  }
+
+  public void destroy() {
+    FileUtils.deleteQuietly(TMP_DIR);
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java
index ae8f245..38da512 100644
--- a/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/segments/v1/creator/SegmentTestUtils.java
@@ -94,6 +94,16 @@ public class SegmentTestUtils {
     return segmentGeneratorConfig;
   }
 
+  public static SegmentGeneratorConfig getSegmentGeneratorConfig(File inputFile, FileFormat inputFormat, File outputDir,
+      String tableName, TableConfig tableConfig, Schema schema) {
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(inputFile.getAbsolutePath());
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setFormat(inputFormat);
+    segmentGeneratorConfig.setTableName(tableName);
+    return segmentGeneratorConfig;
+  }
+
   public static Schema extractSchemaFromAvroWithoutTime(File avroFile)
       throws IOException {
     DataFileStream<GenericRecord> dataStream =
diff --git a/pinot-core/src/test/resources/data/test_empty_data.json b/pinot-core/src/test/resources/data/test_empty_data.json
new file mode 100644
index 0000000..e69de29


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org