You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/21 01:17:49 UTC

[incubator-pinot] branch sharded_consumer_type_support_with_kinesis updated: Add tests for end-of-life cases

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
     new 467965b  Add tests for end-of-life cases
467965b is described below

commit 467965b141dd86ca193d6634342d6794943165e0
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Jan 20 17:17:18 2021 -0800

    Add tests for end-of-life cases
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  26 ++--
 .../PinotLLCRealtimeSegmentManagerTest.java        | 162 +++++++++++++++++----
 .../realtime/LLRealtimeSegmentDataManagerTest.java |  10 +-
 3 files changed, 155 insertions(+), 43 deletions(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3a3dcf6..f86f687 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -515,21 +515,20 @@ public class PinotLLCRealtimeSegmentManager {
     // If there were no splits/merges we would receive A,B
     List<PartitionGroupInfo> newPartitionGroupInfoList =
         getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+    Set<Integer> newPartitionGroupSet =
+        newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
     int numPartitions = newPartitionGroupInfoList.size();
 
     // Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata
     String newConsumingSegmentName = null;
     String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
     long newSegmentCreationTimeMs = getCurrentTimeMs();
-    for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
-      if (partitionGroupInfo.getPartitionGroupId() == committingSegmentPartitionGroupId) {
-        LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
-            committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
-        createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
-            committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
-        newConsumingSegmentName = newLLCSegment.getSegmentName();
-        break;
-      }
+    if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+      LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+          committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+      createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+          committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+      newConsumingSegmentName = newLLCSegment.getSegmentName();
     }
 
     // TODO: create new partition groups also here
@@ -924,7 +923,10 @@ public class PinotLLCRealtimeSegmentManager {
    * a) metadata status is IN_PROGRESS, segment state is CONSUMING - happy path
    * b) metadata status is IN_PROGRESS, segment state is OFFLINE - create new metadata and new CONSUMING segment
    * c) metadata status is DONE, segment state is OFFLINE - create new metadata and new CONSUMING segment
-   * d) metadata status is DONE, segment state is CONSUMING - create new metadata and new CONSUMING segment
+   * d) metadata status is DONE, segment state is CONSUMING -
+   * If shard not reached end of life, create new metadata and new CONSUMING segment. Update current segment to ONLINE in ideal state.
+   * If shard reached end of life, do not create new metadata and CONSUMING segment. Simply update current segment to ONLINE in ideal state
+   *
    * 2) Segment is absent from ideal state - add new segment to ideal state
    *
    * Also checks if it is too soon to correct (could be in the process of committing segment)
@@ -966,8 +968,8 @@ public class PinotLLCRealtimeSegmentManager {
     // Possible things to repair:
     // 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING:
     //    a. Create metadata for next segment and find hosts to assign it to.
-    //    b. update current segment in idealstate to ONLINE
-    //    c. add new segment in idealstate to CONSUMING on the hosts.
+    //    b. update current segment in idealstate to ONLINE (only if partition is present in newPartitionGroupInfo)
+    //    c. add new segment in idealstate to CONSUMING on the hosts (only if partition is present in newPartitionGroupInfo)
     // 2. The latest metadata is IN_PROGRESS, but segment is not there in idealstate.
     //    a. change prev segment to ONLINE in idealstate
     //    b. add latest segment to CONSUMING in idealstate.
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index ecbf2ef..e8309d3 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.controller.helix.core.realtime;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -58,7 +59,6 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.PartitionGroupInfo;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -249,6 +249,49 @@ public class PinotLLCRealtimeSegmentManagerTest {
     } catch (IllegalStateException e) {
       // Expected
     }
+
+    // committing segment's partitionGroupId no longer in the newPartitionGroupInfoList
+    List<PartitionGroupInfo> partitionGroupInfoListWithout0 =
+        segmentManager.getPartitionGroupInfoList(segmentManager._streamConfig, Collections.emptyList());
+    partitionGroupInfoListWithout0.remove(0);
+    segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout0;
+
+    // Commit a segment for partition 0 - No new entries created for partition which reached end of life
+    committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName();
+    String committingSegmentStartOffset = segmentManager._segmentZKMetadataMap.get(committingSegment).getStartOffset();
+    String committingSegmentEndOffset =
+        new LongMsgOffset(Long.parseLong(committingSegmentStartOffset) + NUM_DOCS).toString();
+    committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment, committingSegmentEndOffset, 0L);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+    int instanceStateMapSize = instanceStatesMap.size();
+    int metadataMapSize = segmentManager._segmentZKMetadataMap.size();
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
+    // No changes in the number of ideal state or zk entries
+    assertEquals(instanceStatesMap.size(), instanceStateMapSize);
+    assertEquals(segmentManager._segmentZKMetadataMap.size(), metadataMapSize);
+
+    // Verify instance states for committed segment and new consuming segment
+    committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
+    assertNotNull(committedSegmentInstanceStateMap);
+    assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+        Collections.singleton(SegmentStateModel.ONLINE));
+
+    consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
+    consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+    assertNull(consumingSegmentInstanceStateMap);
+
+    // Verify segment ZK metadata for committed segment and new consuming segment
+    committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
+    assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+    assertEquals(committedSegmentZKMetadata.getStartOffset(), committingSegmentStartOffset);
+    assertEquals(committedSegmentZKMetadata.getEndOffset(), committingSegmentEndOffset);
+    assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
+    assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+    assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
+    assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+
+    consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
+    assertNull(consumingSegmentZKMetadata);
   }
 
   /**
@@ -411,6 +454,20 @@ public class PinotLLCRealtimeSegmentManagerTest {
    *
    * 4. MaxSegmentCompletionTime: Segment completion has 5 minutes to retry and complete between steps 1 and 3.
    * Correction: Do not correct the segments before the allowed time for segment completion
+   *
+   *
+   * End-of-shard case:
+   * Additionally, shards of some streams may be detected as reached end-of-life when committing.
+   * In such cases, step 2 is skipped, and step 3 is done partially (change committing segment state to ONLINE
+   * but don't create new segment with state CONSUMING)
+   *
+   * Scenarios:
+   * 1. Step 3 failed - we will find segment ZK metadata DONE, but ideal state CONSUMING
+   * Correction: Since shard has ended, do not create new segment ZK metadata, or new entry in ideal state.
+   * Simply update CONSUMING segment in ideal state to ONLINE
+   *
+   * 2. Shard which has reached EOL detected - we will find segment ZK metadata DONE and ideal state ONLINE
+   * Correction: No repair needed. Acceptable case.
    */
   @Test
   public void testRepairs() {
@@ -422,12 +479,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // Remove the CONSUMING segment from the ideal state for partition 0 (step 3 failed)
     String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
     removeNewConsumingSegment(instanceStatesMap, consumingSegment, null);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed)
     removeNewConsumingSegment(instanceStatesMap, consumingSegment, null);
     assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // 2 partitions commit segment
     for (int partitionId = 0; partitionId < 2; partitionId++) {
@@ -442,12 +499,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
     String latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
     removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed)
     removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
     assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     /*
       Test all replicas of the new segment are OFFLINE
@@ -461,12 +518,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
     // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
     turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 again
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
     turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // 2 partitions commit segment
     for (int partitionId = 0; partitionId < 2; partitionId++) {
@@ -484,22 +541,51 @@ public class PinotLLCRealtimeSegmentManagerTest {
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
     latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName();
     removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed)
     removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
     assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
     turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
 
     // Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 again
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 4, CURRENT_TIME_MS).getSegmentName();
     turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
-    testRepairs(segmentManager);
+    testRepairs(segmentManager, Collections.emptyList());
+
+    /*
+     * End of shard cases
+     */
+    // 1 reached end of shard.
+    List<PartitionGroupInfo> partitionGroupInfoListWithout1 =
+        segmentManager.getPartitionGroupInfoList(segmentManager._streamConfig, Collections.emptyList());
+    partitionGroupInfoListWithout1.remove(1);
+    segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout1;
+    // noop
+    testRepairs(segmentManager, Collections.emptyList());
+
+    // 1 commits segment - should not create new metadata or CONSUMING segment
+    String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, CURRENT_TIME_MS).getSegmentName();
+    String startOffset = segmentManager._segmentZKMetadataMap.get(segmentName).getStartOffset();
+    CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName,
+        new LongMsgOffset(Long.parseLong(startOffset) + NUM_DOCS).toString(), 0L);
+    committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+    segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
+    // ONLINE in IS and metadata DONE, but end of shard (not present in partition group list), so don't repair
+    testRepairs(segmentManager, Lists.newArrayList(1));
+
+    // make the last ONLINE segment of the shard as CONSUMING (failed between step1 and 3)
+    segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout1;
+    consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, CURRENT_TIME_MS).getSegmentName();
+    turnNewConsumingSegmentConsuming(instanceStatesMap, consumingSegment);
+
+    // makes the IS to ONLINE, but creates no new entries, because end of shard.
+    testRepairs(segmentManager, Lists.newArrayList(1));
   }
 
   /**
@@ -539,7 +625,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
   }
 
-  private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) {
+  /**
+   * Turns all instances for the segment to CONSUMING in the ideal state.
+   */
+  private void turnNewConsumingSegmentConsuming(Map<String, Map<String, String>> instanceStatesMap,
+      String consumingSegment) {
+    Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+    assertNotNull(consumingSegmentInstanceStateMap);
+    for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) {
+      entry.setValue(SegmentStateModel.CONSUMING);
+    }
+  }
+
+  private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager, List<Integer> shardsEnded) {
     Map<String, Map<String, String>> oldInstanceStatesMap =
         cloneInstanceStatesMap(segmentManager._idealState.getRecord().getMapFields());
     segmentManager._exceededMaxSegmentCompletionTime = false;
@@ -547,7 +645,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     verifyNoChangeToOldEntries(segmentManager, oldInstanceStatesMap);
     segmentManager._exceededMaxSegmentCompletionTime = true;
     segmentManager.ensureAllPartitionsConsuming();
-    verifyRepairs(segmentManager);
+    verifyRepairs(segmentManager, shardsEnded);
   }
 
   /**
@@ -564,7 +662,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
   }
 
-  private void verifyRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) {
+  private void verifyRepairs(FakePinotLLCRealtimeSegmentManager segmentManager, List<Integer> shardsEnded) {
     Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();
 
     // Segments are the same for ideal state and ZK metadata
@@ -597,16 +695,18 @@ public class PinotLLCRealtimeSegmentManagerTest {
       int numSegments = segments.size();
 
       String latestSegment = segments.get(numSegments - 1);
-
-      // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state
       Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegment);
-      assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
-      assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
-
-      // Latest segment ZK metadata should be IN_PROGRESS
-      assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS);
+      if (!shardsEnded.contains(partitionId)) {
+        // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state
+        assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
+        assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
+
+        // Latest segment ZK metadata should be IN_PROGRESS
+        assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS);
+        numSegments --;
+      }
 
-      for (int i = 0; i < numSegments - 1; i++) {
+      for (int i = 0; i < numSegments; i++) {
         String segmentName = segments.get(i);
 
         // Committed segment should have all instances in ONLINE state
@@ -620,8 +720,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
         // Verify segment start/end offset
         assertEquals(segmentZKMetadata.getStartOffset(),
             new LongMsgOffset(PARTITION_OFFSET.getOffset() + i * (long) NUM_DOCS).toString());
-        assertEquals(segmentZKMetadata.getEndOffset(),
-            segmentManager._segmentZKMetadataMap.get(segments.get(i + 1)).getStartOffset());
+        if (shardsEnded.contains(partitionId) && ((i + 1) == numSegments)) {
+          assertEquals(Long.parseLong(segmentZKMetadata.getEndOffset()),
+              Long.parseLong(segmentZKMetadata.getStartOffset()) + NUM_DOCS);
+        } else {
+          assertEquals(segmentZKMetadata.getEndOffset(),
+              segmentManager._segmentZKMetadataMap.get(segments.get(i + 1)).getStartOffset());
+        }
       }
     }
   }
@@ -818,6 +923,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, Integer> _segmentZKMetadataVersionMap = new HashMap<>();
     IdealState _idealState;
     int _numPartitions;
+    List<PartitionGroupInfo> _partitionGroupInfoList = null;
     boolean _exceededMaxSegmentCompletionTime = false;
 
     FakePinotLLCRealtimeSegmentManager() {
@@ -919,9 +1025,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
     @Override
     List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
         List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
-      return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
-          PARTITION_OFFSET))
-          .collect(Collectors.toList());
+      if (_partitionGroupInfoList != null) {
+        return _partitionGroupInfoList;
+      } else {
+        return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, PARTITION_OFFSET)).collect(Collectors.toList());
+      }
     }
 
     @Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index d7aec8d..ae8b138 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
 import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -147,6 +148,7 @@ public class LLRealtimeSegmentDataManagerTest {
     segmentZKMetadata.setSegmentName(_segmentNameStr);
     segmentZKMetadata.setStartOffset(_startOffset.toString());
     segmentZKMetadata.setCreationTime(System.currentTimeMillis());
+    segmentZKMetadata.setStatus(Status.IN_PROGRESS);
     return segmentZKMetadata;
   }
 
@@ -771,7 +773,7 @@ public class LLRealtimeSegmentDataManagerTest {
     public Field _state;
     public Field _shouldStop;
     public Field _stopReason;
-    private Field _streamMsgOffsetFactory;
+    private final Field _checkpointFactory;
     public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
     public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>();
     public boolean _commitSegmentCalled = false;
@@ -810,9 +812,9 @@ public class LLRealtimeSegmentDataManagerTest {
       _stopReason = LLRealtimeSegmentDataManager.class.getDeclaredField("_stopReason");
       _stopReason.setAccessible(true);
       _semaphoreMap = semaphoreMap;
-      _streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
-      _streamMsgOffsetFactory.setAccessible(true);
-      _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
+      _checkpointFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_checkpointFactory");
+      _checkpointFactory.setAccessible(true);
+      _checkpointFactory.set(this, new LongMsgOffsetFactory());
     }
 
     public String getStopReason() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org