You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2021/01/21 01:17:49 UTC
[incubator-pinot] branch sharded_consumer_type_support_with_kinesis
updated: Add tests for end-of-life cases
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch sharded_consumer_type_support_with_kinesis
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/sharded_consumer_type_support_with_kinesis by this push:
new 467965b Add tests for end-of-life cases
467965b is described below
commit 467965b141dd86ca193d6634342d6794943165e0
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Wed Jan 20 17:17:18 2021 -0800
Add tests for end-of-life cases
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 26 ++--
.../PinotLLCRealtimeSegmentManagerTest.java | 162 +++++++++++++++++----
.../realtime/LLRealtimeSegmentDataManagerTest.java | 10 +-
3 files changed, 155 insertions(+), 43 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3a3dcf6..f86f687 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -515,21 +515,20 @@ public class PinotLLCRealtimeSegmentManager {
// If there were no splits/merges we would receive A,B
List<PartitionGroupInfo> newPartitionGroupInfoList =
getPartitionGroupInfoList(streamConfig, currentPartitionGroupMetadataList);
+ Set<Integer> newPartitionGroupSet =
+ newPartitionGroupInfoList.stream().map(PartitionGroupInfo::getPartitionGroupId).collect(Collectors.toSet());
int numPartitions = newPartitionGroupInfoList.size();
// Only if committingSegment's partitionGroup is present in the newPartitionGroupInfoList, we create new segment metadata
String newConsumingSegmentName = null;
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
long newSegmentCreationTimeMs = getCurrentTimeMs();
- for (PartitionGroupInfo partitionGroupInfo : newPartitionGroupInfoList) {
- if (partitionGroupInfo.getPartitionGroupId() == committingSegmentPartitionGroupId) {
- LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
- committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
- committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
- newConsumingSegmentName = newLLCSegment.getSegmentName();
- break;
- }
+ if (newPartitionGroupSet.contains(committingSegmentPartitionGroupId)) {
+ LLCSegmentName newLLCSegment = new LLCSegmentName(rawTableName, committingSegmentPartitionGroupId,
+ committingLLCSegment.getSequenceNumber() + 1, newSegmentCreationTimeMs);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegment, newSegmentCreationTimeMs,
+ committingSegmentDescriptor, committingSegmentZKMetadata, instancePartitions, numPartitions, numReplicas);
+ newConsumingSegmentName = newLLCSegment.getSegmentName();
}
// TODO: create new partition groups also here
@@ -924,7 +923,10 @@ public class PinotLLCRealtimeSegmentManager {
* a) metadata status is IN_PROGRESS, segment state is CONSUMING - happy path
* b) metadata status is IN_PROGRESS, segment state is OFFLINE - create new metadata and new CONSUMING segment
* c) metadata status is DONE, segment state is OFFLINE - create new metadata and new CONSUMING segment
- * d) metadata status is DONE, segment state is CONSUMING - create new metadata and new CONSUMING segment
+ * d) metadata status is DONE, segment state is CONSUMING -
+ * If shard not reached end of life, create new metadata and new CONSUMING segment. Update current segment to ONLINE in ideal state.
+ * If shard reached end of life, do not create new metadata and CONSUMING segment. Simply update current segment to ONLINE in ideal state
+ *
* 2) Segment is absent from ideal state - add new segment to ideal state
*
* Also checks if it is too soon to correct (could be in the process of committing segment)
@@ -966,8 +968,8 @@ public class PinotLLCRealtimeSegmentManager {
// Possible things to repair:
// 1. The latest metadata is in DONE state, but the idealstate says segment is CONSUMING:
// a. Create metadata for next segment and find hosts to assign it to.
- // b. update current segment in idealstate to ONLINE
- // c. add new segment in idealstate to CONSUMING on the hosts.
+ // b. update current segment in idealstate to ONLINE (only if partition is present in newPartitionGroupInfo)
+ // c. add new segment in idealstate to CONSUMING on the hosts (only if partition is present in newPartitionGroupInfo)
// 2. The latest metadata is IN_PROGRESS, but segment is not there in idealstate.
// a. change prev segment to ONLINE in idealstate
// b. add latest segment to CONSUMING in idealstate.
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index ecbf2ef..e8309d3 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.controller.helix.core.realtime;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -58,7 +59,6 @@ import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.stream.LongMsgOffset;
-import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupInfo;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
@@ -249,6 +249,49 @@ public class PinotLLCRealtimeSegmentManagerTest {
} catch (IllegalStateException e) {
// Expected
}
+
+ // committing segment's partitionGroupId no longer in the newPartitionGroupInfoList
+ List<PartitionGroupInfo> partitionGroupInfoListWithout0 =
+ segmentManager.getPartitionGroupInfoList(segmentManager._streamConfig, Collections.emptyList());
+ partitionGroupInfoListWithout0.remove(0);
+ segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout0;
+
+ // Commit a segment for partition 0 - No new entries created for partition which reached end of life
+ committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName();
+ String committingSegmentStartOffset = segmentManager._segmentZKMetadataMap.get(committingSegment).getStartOffset();
+ String committingSegmentEndOffset =
+ new LongMsgOffset(Long.parseLong(committingSegmentStartOffset) + NUM_DOCS).toString();
+ committingSegmentDescriptor = new CommittingSegmentDescriptor(committingSegment, committingSegmentEndOffset, 0L);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ int instanceStateMapSize = instanceStatesMap.size();
+ int metadataMapSize = segmentManager._segmentZKMetadataMap.size();
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
+ // No changes in the number of ideal state or zk entries
+ assertEquals(instanceStatesMap.size(), instanceStateMapSize);
+ assertEquals(segmentManager._segmentZKMetadataMap.size(), metadataMapSize);
+
+ // Verify instance states for committed segment and new consuming segment
+ committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
+ assertNotNull(committedSegmentInstanceStateMap);
+ assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
+ Collections.singleton(SegmentStateModel.ONLINE));
+
+ consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
+ consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+ assertNull(consumingSegmentInstanceStateMap);
+
+ // Verify segment ZK metadata for committed segment and new consuming segment
+ committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
+ assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
+ assertEquals(committedSegmentZKMetadata.getStartOffset(), committingSegmentStartOffset);
+ assertEquals(committedSegmentZKMetadata.getEndOffset(), committingSegmentEndOffset);
+ assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
+ assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
+ assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
+ assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
+
+ consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
+ assertNull(consumingSegmentZKMetadata);
}
/**
@@ -411,6 +454,20 @@ public class PinotLLCRealtimeSegmentManagerTest {
*
* 4. MaxSegmentCompletionTime: Segment completion has 5 minutes to retry and complete between steps 1 and 3.
* Correction: Do not correct the segments before the allowed time for segment completion
+ *
+ *
+ * End-of-shard case:
+ * Additionally, shards of some streams may be detected as reached end-of-life when committing.
+ * In such cases, step 2 is skipped, and step 3 is done partially (change committing segment state to ONLINE
+ * but don't create new segment with state CONSUMING)
+ *
+ * Scenarios:
+ * 1. Step 3 failed - we will find segment ZK metadata DONE, but ideal state CONSUMING
+ * Correction: Since shard has ended, do not create new segment ZK metadata, or new entry in ideal state.
+ * Simply update CONSUMING segment in ideal state to ONLINE
+ *
+ * 2. Shard which has reached EOL detected - we will find segment ZK metadata DONE and ideal state ONLINE
+ * Correction: No repair needed. Acceptable case.
*/
@Test
public void testRepairs() {
@@ -422,12 +479,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Remove the CONSUMING segment from the ideal state for partition 0 (step 3 failed)
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
removeNewConsumingSegment(instanceStatesMap, consumingSegment, null);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed)
removeNewConsumingSegment(instanceStatesMap, consumingSegment, null);
assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// 2 partitions commit segment
for (int partitionId = 0; partitionId < 2; partitionId++) {
@@ -442,12 +499,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
String latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed)
removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
/*
Test all replicas of the new segment are OFFLINE
@@ -461,12 +518,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 again
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// 2 partitions commit segment
for (int partitionId = 0; partitionId < 2; partitionId++) {
@@ -484,22 +541,51 @@ public class PinotLLCRealtimeSegmentManagerTest {
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
latestCommittedSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, CURRENT_TIME_MS).getSegmentName();
removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Remove the CONSUMING segment from the ideal state and segment ZK metadata map for partition 0 (step 2 failed)
removeNewConsumingSegment(instanceStatesMap, consumingSegment, latestCommittedSegment);
assertNotNull(segmentManager._segmentZKMetadataMap.remove(consumingSegment));
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 3, CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
// Turn all the replicas for the CONSUMING segment to OFFLINE for partition 0 again
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 4, CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentOffline(instanceStatesMap, consumingSegment);
- testRepairs(segmentManager);
+ testRepairs(segmentManager, Collections.emptyList());
+
+ /*
+ * End of shard cases
+ */
+ // 1 reached end of shard.
+ List<PartitionGroupInfo> partitionGroupInfoListWithout1 =
+ segmentManager.getPartitionGroupInfoList(segmentManager._streamConfig, Collections.emptyList());
+ partitionGroupInfoListWithout1.remove(1);
+ segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout1;
+ // noop
+ testRepairs(segmentManager, Collections.emptyList());
+
+ // 1 commits segment - should not create new metadata or CONSUMING segment
+ String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, CURRENT_TIME_MS).getSegmentName();
+ String startOffset = segmentManager._segmentZKMetadataMap.get(segmentName).getStartOffset();
+ CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(segmentName,
+ new LongMsgOffset(Long.parseLong(startOffset) + NUM_DOCS).toString(), 0L);
+ committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
+ segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
+ // ONLINE in IS and metadata DONE, but end of shard (not present in partition group list), so don't repair
+ testRepairs(segmentManager, Lists.newArrayList(1));
+
+ // make the last ONLINE segment of the shard as CONSUMING (failed between step1 and 3)
+ segmentManager._partitionGroupInfoList = partitionGroupInfoListWithout1;
+ consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, CURRENT_TIME_MS).getSegmentName();
+ turnNewConsumingSegmentConsuming(instanceStatesMap, consumingSegment);
+
+ // makes the IS to ONLINE, but creates no new entries, because end of shard.
+ testRepairs(segmentManager, Lists.newArrayList(1));
}
/**
@@ -539,7 +625,19 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
- private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) {
+ /**
+ * Turns all instances for the segment to CONSUMING in the ideal state.
+ */
+ private void turnNewConsumingSegmentConsuming(Map<String, Map<String, String>> instanceStatesMap,
+ String consumingSegment) {
+ Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
+ assertNotNull(consumingSegmentInstanceStateMap);
+ for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) {
+ entry.setValue(SegmentStateModel.CONSUMING);
+ }
+ }
+
+ private void testRepairs(FakePinotLLCRealtimeSegmentManager segmentManager, List<Integer> shardsEnded) {
Map<String, Map<String, String>> oldInstanceStatesMap =
cloneInstanceStatesMap(segmentManager._idealState.getRecord().getMapFields());
segmentManager._exceededMaxSegmentCompletionTime = false;
@@ -547,7 +645,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
verifyNoChangeToOldEntries(segmentManager, oldInstanceStatesMap);
segmentManager._exceededMaxSegmentCompletionTime = true;
segmentManager.ensureAllPartitionsConsuming();
- verifyRepairs(segmentManager);
+ verifyRepairs(segmentManager, shardsEnded);
}
/**
@@ -564,7 +662,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
- private void verifyRepairs(FakePinotLLCRealtimeSegmentManager segmentManager) {
+ private void verifyRepairs(FakePinotLLCRealtimeSegmentManager segmentManager, List<Integer> shardsEnded) {
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();
// Segments are the same for ideal state and ZK metadata
@@ -597,16 +695,18 @@ public class PinotLLCRealtimeSegmentManagerTest {
int numSegments = segments.size();
String latestSegment = segments.get(numSegments - 1);
-
- // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state
Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegment);
- assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
- assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
-
- // Latest segment ZK metadata should be IN_PROGRESS
- assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS);
+ if (!shardsEnded.contains(partitionId)) {
+ // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state
+ assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING));
+ assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE));
+
+ // Latest segment ZK metadata should be IN_PROGRESS
+ assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS);
+ numSegments --;
+ }
- for (int i = 0; i < numSegments - 1; i++) {
+ for (int i = 0; i < numSegments; i++) {
String segmentName = segments.get(i);
// Committed segment should have all instances in ONLINE state
@@ -620,8 +720,13 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Verify segment start/end offset
assertEquals(segmentZKMetadata.getStartOffset(),
new LongMsgOffset(PARTITION_OFFSET.getOffset() + i * (long) NUM_DOCS).toString());
- assertEquals(segmentZKMetadata.getEndOffset(),
- segmentManager._segmentZKMetadataMap.get(segments.get(i + 1)).getStartOffset());
+ if (shardsEnded.contains(partitionId) && ((i + 1) == numSegments)) {
+ assertEquals(Long.parseLong(segmentZKMetadata.getEndOffset()),
+ Long.parseLong(segmentZKMetadata.getStartOffset()) + NUM_DOCS);
+ } else {
+ assertEquals(segmentZKMetadata.getEndOffset(),
+ segmentManager._segmentZKMetadataMap.get(segments.get(i + 1)).getStartOffset());
+ }
}
}
}
@@ -818,6 +923,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Integer> _segmentZKMetadataVersionMap = new HashMap<>();
IdealState _idealState;
int _numPartitions;
+ List<PartitionGroupInfo> _partitionGroupInfoList = null;
boolean _exceededMaxSegmentCompletionTime = false;
FakePinotLLCRealtimeSegmentManager() {
@@ -919,9 +1025,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
List<PartitionGroupInfo> getPartitionGroupInfoList(StreamConfig streamConfig,
List<PartitionGroupMetadata> currentPartitionGroupMetadataList) {
- return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i,
- PARTITION_OFFSET))
- .collect(Collectors.toList());
+ if (_partitionGroupInfoList != null) {
+ return _partitionGroupInfoList;
+ } else {
+ return IntStream.range(0, _numPartitions).mapToObj(i -> new PartitionGroupInfo(i, PARTITION_OFFSET)).collect(Collectors.toList());
+ }
}
@Override
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index d7aec8d..ae8b138 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
+import org.apache.pinot.common.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.config.InstanceDataManagerConfig;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
@@ -147,6 +148,7 @@ public class LLRealtimeSegmentDataManagerTest {
segmentZKMetadata.setSegmentName(_segmentNameStr);
segmentZKMetadata.setStartOffset(_startOffset.toString());
segmentZKMetadata.setCreationTime(System.currentTimeMillis());
+ segmentZKMetadata.setStatus(Status.IN_PROGRESS);
return segmentZKMetadata;
}
@@ -771,7 +773,7 @@ public class LLRealtimeSegmentDataManagerTest {
public Field _state;
public Field _shouldStop;
public Field _stopReason;
- private Field _streamMsgOffsetFactory;
+ private final Field _checkpointFactory;
public LinkedList<LongMsgOffset> _consumeOffsets = new LinkedList<>();
public LinkedList<SegmentCompletionProtocol.Response> _responses = new LinkedList<>();
public boolean _commitSegmentCalled = false;
@@ -810,9 +812,9 @@ public class LLRealtimeSegmentDataManagerTest {
_stopReason = LLRealtimeSegmentDataManager.class.getDeclaredField("_stopReason");
_stopReason.setAccessible(true);
_semaphoreMap = semaphoreMap;
- _streamMsgOffsetFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_streamPartitionMsgOffsetFactory");
- _streamMsgOffsetFactory.setAccessible(true);
- _streamMsgOffsetFactory.set(this, new LongMsgOffsetFactory());
+ _checkpointFactory = LLRealtimeSegmentDataManager.class.getDeclaredField("_checkpointFactory");
+ _checkpointFactory.setAccessible(true);
+ _checkpointFactory.set(this, new LongMsgOffsetFactory());
}
public String getStopReason() {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org