You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2021/11/12 23:04:37 UTC
[pinot] branch master updated: Use oldest offset on newly detected
partitions (#7756)
This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 561b8a3 Use oldest offset on newly detected partitions (#7756)
561b8a3 is described below
commit 561b8a3c6fa832be3ab9e540ca3688a38a11e4c8
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Fri Nov 12 15:04:18 2021 -0800
Use oldest offset on newly detected partitions (#7756)
* Use oldest offset on newly detected partitions
Fixed the issue where Pinot was losing data when it detected new stream partitions
(depending on table configuration).
Manual testing by running LLCRealtimeClusterIntegrationTest via debugger
- Change the table config to start from largest offset
- Force the test to detect only one partition, notice that roughly
half the rows are ingested, and only partiton 0 shows up in idealstate
- Run the RealtimeSegmentValidationManager job via swagger to force detection
of new partition.
- Confirmed that all rows are now present.
Issue #7741
* Fixed linter error
---
.../core/realtime/PinotLLCRealtimeSegmentManager.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 7f6996a..16c7471 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -316,7 +316,7 @@ public class PinotLLCRealtimeSegmentManager {
for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) {
String segmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas, newPartitionGroupMetadataList);
+ numPartitionGroups, numReplicas, newPartitionGroupMetadataList, false);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment,
instancePartitionsMap);
@@ -1193,7 +1193,7 @@ public class PinotLLCRealtimeSegmentManager {
if (!latestSegmentZKMetadataMap.containsKey(partitionGroupId)) {
String newSegmentName =
setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions,
- numPartitions, numReplicas, newPartitionGroupMetadataList);
+ numPartitions, numReplicas, newPartitionGroupMetadataList, true);
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment,
instancePartitionsMap);
}
@@ -1232,10 +1232,16 @@ public class PinotLLCRealtimeSegmentManager {
*/
private String setupNewPartitionGroup(TableConfig tableConfig, PartitionLevelStreamConfig streamConfig,
PartitionGroupMetadata partitionGroupMetadata, long creationTimeMs, InstancePartitions instancePartitions,
- int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ int numPartitionGroups, int numReplicas, List<PartitionGroupMetadata> partitionGroupMetadataList,
+ boolean isLiveTable) {
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
- String startOffset = partitionGroupMetadata.getStartOffset().toString();
+ StreamPartitionMsgOffset startOffset;
+ if (isLiveTable) {
+ startOffset = getPartitionGroupSmallestOffset(streamConfig, partitionGroupId);
+ } else {
+ startOffset = partitionGroupMetadata.getStartOffset();
+ }
LOGGER.info("Setting up new partition group: {} for table: {}", partitionGroupId, realtimeTableName);
String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
@@ -1243,7 +1249,8 @@ public class PinotLLCRealtimeSegmentManager {
new LLCSegmentName(rawTableName, partitionGroupId, STARTING_SEQUENCE_NUMBER, creationTimeMs);
String newSegmentName = newLLCSegmentName.getSegmentName();
- CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null, startOffset, 0);
+ CommittingSegmentDescriptor committingSegmentDescriptor = new CommittingSegmentDescriptor(null,
+ startOffset.toString(), 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitionGroups, numReplicas,
partitionGroupMetadataList);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org