You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "deemoliu (via GitHub)" <gi...@apache.org> on 2023/05/31 22:33:21 UTC

[GitHub] [pinot] deemoliu opened a new pull request, #10826: Upsert snapshot should be taken before new a consuming segment created

deemoliu opened a new pull request, #10826:
URL: https://github.com/apache/pinot/pull/10826

   Bug fix for #10800 
   
   The current behavior of Upsert ValidDocIds Snapshot is, we took snapshot when removeSegments.
   However there might be PKs (e.g. pk1) that marked as invalid because same PK appears in the new consuming segment.
   When it happens, the snapshot will be persisted with invalid pk1.
   
   When restart, the new consuimg segment will be discard and re-ingested, then it will cause inconsistency because we will load ValidDocIds Snapshot instead of build ValidDocsIndexes. 
   
   
   `bugfix`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu closed pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu closed pull request #10826: Upsert snapshot should be taken before new a consuming segment created
URL: https://github.com/apache/pinot/pull/10826


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on PR #10826:
URL: https://github.com/apache/pinot/pull/10826#issuecomment-1571050105

   cc: @chenboat @Jackie-Jiang @yupeng9 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1217731384


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,46 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = _realtimeTableDataManager.acquireAllSegments();

Review Comment:
   What's the difference between this line and the previous one except the variable name?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #10826:
URL: https://github.com/apache/pinot/pull/10826#issuecomment-1571153555

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/10826?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#10826](https://app.codecov.io/gh/apache/pinot/pull/10826?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (22791a6) into [master](https://app.codecov.io/gh/apache/pinot/commit/579082c8030d9c1d9f61cd078229fca96f4d35ee?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (579082c) will **decrease** coverage by `50.77%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             master   #10826       +/-   ##
   =============================================
   - Coverage     64.40%   13.63%   -50.77%     
   + Complexity     6446      439     -6007     
   =============================================
     Files          2098     2110       +12     
     Lines        113315   113947      +632     
     Branches      17204    17321      +117     
   =============================================
   - Hits          72984    15541    -57443     
   - Misses        35082    97137    +62055     
   + Partials       5249     1269     -3980     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests1 | `?` | |
   | unittests2 | `13.63% <0.00%> (-0.06%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/10826?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...manager/realtime/LLRealtimeSegmentDataManager.java](https://app.codecov.io/gh/apache/pinot/pull/10826?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtY29yZS9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY29yZS9kYXRhL21hbmFnZXIvcmVhbHRpbWUvTExSZWFsdGltZVNlZ21lbnREYXRhTWFuYWdlci5qYXZh) | `0.00% <0.00%> (-51.39%)` | :arrow_down: |
   | [...l/indexsegment/immutable/ImmutableSegmentImpl.java](https://app.codecov.io/gh/apache/pinot/pull/10826?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC9pbmRleHNlZ21lbnQvaW1tdXRhYmxlL0ltbXV0YWJsZVNlZ21lbnRJbXBsLmphdmE=) | `0.00% <ø> (-61.12%)` | :arrow_down: |
   | [...cal/upsert/BasePartitionUpsertMetadataManager.java](https://app.codecov.io/gh/apache/pinot/pull/10826?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3Qtc2VnbWVudC1sb2NhbC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3Qvc2VnbWVudC9sb2NhbC91cHNlcnQvQmFzZVBhcnRpdGlvblVwc2VydE1ldGFkYXRhTWFuYWdlci5qYXZh) | `0.00% <ø> (-50.00%)` | :arrow_down: |
   
   ... and [1471 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/10826/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] ankitsultana commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "ankitsultana (via GitHub)" <gi...@apache.org>.
ankitsultana commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1215787274


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();

Review Comment:
   Yeah that's what I think as well.
   
   I wasn't sure why two OFFLINE -> CONSUMING transitions may happen for two segments of the same partition on the same server so hence wanted to reproduce the issue once. One possible scenario I could think of was: when a server is restarted, the other server might commit a segment (say segment 10) and before the restarted server could process the offline -> consuming transition for segment 10, it might start processing the same for segment 11.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1212393148


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1071,6 +1075,21 @@ private void closeStreamConsumers() {
     if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
       _partitionGroupConsumerSemaphore.release();
     }
+    if (_tableConfig.getUpsertConfig().isEnableSnapshot()) {

Review Comment:
   Why do we take snapshot here? `_isReadyToConsumeData` won't be picked up by the next segment.
   I feel we can take the snapshot just after acquiring the consumer semaphore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1214856674


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -232,7 +236,7 @@ public void deleteSegmentFile() {
   // consuming.
   private final AtomicBoolean _acquiredConsumerSemaphore;
   private final ServerMetrics _serverMetrics;
-  private final BooleanSupplier _isReadyToConsumeData;
+  private BooleanSupplier _isReadyToConsumeData;

Review Comment:
   Revert this



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -208,8 +208,39 @@ protected void doInit() {
       _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics);
     }
 
-    // For dedup and partial-upsert, need to wait for all segments loaded before starting consuming data
-    if (isDedupEnabled() || isPartialUpsertEnabled()) {
+    if (tableConfig.getUpsertConfig() != null && tableConfig.getUpsertConfig().isEnableSnapshot()

Review Comment:
   I don't think we need the change in this file



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // persist snapshot for all sealed segments
+        // TODO: Use a semaphore to guarantee all the segments are sealed before persisting snapshot.

Review Comment:
   Please add more description here. Basically right now we cannot guarantee the previous segment is already replaced with the immutable segment, so the snapshot might not be persisted for the previous consuming segment



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java:
##########
@@ -159,7 +159,7 @@ public void deleteValidDocIdsSnapshot() {
     }
   }
 
-  private File getValidDocIdsSnapshotFile() {
+  public File getValidDocIdsSnapshotFile() {

Review Comment:
   Do we need to expose the internal file? IMO we should use `loadValidDocIdsFromSnapshot()` to access it



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // persist snapshot for all sealed segments
+        // TODO: Use a semaphore to guarantee all the segments are sealed before persisting snapshot.
+        List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+        for (SegmentDataManager segmentDataManager: allSegments) {
+          if (segmentDataManager.getSegment() instanceof ImmutableSegment) {
+            MutableRoaringBitmap validDocIds = new MutableRoaringBitmap();

Review Comment:
   We don't want to initialize `validDocIds` with an empty bitmap. If valid doc ids don't exist (not sure if it is possible), we shouldn't persist it as empty



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // persist snapshot for all sealed segments
+        // TODO: Use a semaphore to guarantee all the segments are sealed before persisting snapshot.
+        List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+        for (SegmentDataManager segmentDataManager: allSegments) {

Review Comment:
   We don't want to persist snapshot for all segments, but only for segments within this partition



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1430,20 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();

Review Comment:
   cc @ankitsultana This might be the root cause of #10552. If for some reason 2 segments are going through the OFFLINE -> CONSUMING state transition (one segment just committed), and the newer segment acquired this lock, the older segment will block here and never finish the OFFLINE -> CONSUMING state transition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1212435986


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1071,6 +1075,21 @@ private void closeStreamConsumers() {
     if (_acquiredConsumerSemaphore.compareAndSet(true, false)) {
       _partitionGroupConsumerSemaphore.release();
     }
+    if (_tableConfig.getUpsertConfig().isEnableSnapshot()) {

Review Comment:
   @Jackie-Jiang thanks for review. When acquiring the consumer semaphore, does it guarantee the previous segment is sealed?
   
   I plan to acquire all segments and persist snapshot for all sealed segments, is it possible that the last segment is not sealed yet?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] yupeng9 commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "yupeng9 (via GitHub)" <gi...@apache.org>.
yupeng9 commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1212533342


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1426,6 +1470,21 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
     try {
       _partitionGroupConsumerSemaphore.acquire();
       _acquiredConsumerSemaphore.set(true);
+      if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+        // block ingestion for new consuming segments
+        _isReadyToConsumeData = () -> false;

Review Comment:
   so this will always override the `isReadyToConsumeData ` to `LLRealtimeSegmentDataManager `. Is there any case that we want to use input param?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1217742086


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,46 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = _realtimeTableDataManager.acquireAllSegments();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments for this partition not sealed completely
+          if (allSegmentsForPartition.stream().anyMatch(
+              segmentDataManager -> segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())) {
+            do {
+              //noinspection BusyWait
+              Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+            } while (allSegmentsForPartition.stream().anyMatch(
+                segmentDataManager -> !segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment()));
+          }
+          // Persist snapshot and release all the segments for this partition, they should be immutableSegments.
+          for (SegmentDataManager segmentDataManager: allSegmentsForPartition) {
+            assert segmentDataManager.getSegment() instanceof ImmutableSegment;
+            if (segmentDataManager.getSegment().getValidDocIds() != null) {
+              MutableRoaringBitmap validDocIds =
+                  segmentDataManager.getSegment().getValidDocIds().getMutableRoaringBitmap();
+              ((ImmutableSegmentImpl) segmentDataManager.getSegment()).persistValidDocIdsSnapshot(validDocIds);

Review Comment:
   Would it be possible to add a `.snapshot` method to upsert metadata manager itself which takes care of the snapshot logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] KKcorps commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "KKcorps (via GitHub)" <gi...@apache.org>.
KKcorps commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1217732143


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,46 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = _realtimeTableDataManager.acquireAllSegments();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments for this partition not sealed completely
+          if (allSegmentsForPartition.stream().anyMatch(
+              segmentDataManager -> segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())) {
+            do {
+              //noinspection BusyWait
+              Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);

Review Comment:
   We already have a different busy wait flow right, for partial upserts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] Jackie-Jiang commented on a diff in pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "Jackie-Jiang (via GitHub)" <gi...@apache.org>.
Jackie-Jiang commented on code in PR #10826:
URL: https://github.com/apache/pinot/pull/10826#discussion_r1224982841


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {

Review Comment:
   This will break for uploaded segment. Use `SegmentUtils.getRealtimeSegmentPartitionId()` to read the partition id



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments (except the new consuming segment) for this partition not sealed completely.
+          // The only consuming segment should be the new consuming segment, all the other segments should be persisted.
+          if (allSegmentsForPartition.stream()
+              .filter(segmentDataManager -> segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())

Review Comment:
   To be more robust, you may only count `segmentDataManager != this` in case somehow this segment is not acquired



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1241,6 +1283,15 @@ private boolean catchupToFinalOffset(StreamPartitionMsgOffset endOffset, long ti
     _state = State.CONSUMING_TO_ONLINE;
     _shouldStop = false;
     try {
+      if (!_isReadyToConsumeData.getAsBoolean()) {

Review Comment:
   Remove this part. We don't need to verify this in catch-up phase



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments (except the new consuming segment) for this partition not sealed completely.
+          // The only consuming segment should be the new consuming segment, all the other segments should be persisted.
+          if (allSegmentsForPartition.stream()
+              .filter(segmentDataManager -> segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())
+              .count() > 1) {
+            do {
+              //noinspection BusyWait
+              Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+            } while (allSegmentsForPartition.stream()

Review Comment:
   (MAJOR) This will never return true without acquiring the segments again.
   We should acquire the segments that are previous mutable in the loop, until there is no mutable segment.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();

Review Comment:
   We should do a try-finally after acquiring all segments, and release all the acquired segments within the finally block to prevent leaving segments unreleased when exception happens



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -662,6 +659,51 @@ public void run() {
       long catchUpTimeMillis = 0L;
       _startTimeMs = now();
       try {
+        // Check READY_TO_CONSUME_DATA before enter consumeLoop in the new PartitionConsumer.
+        if (!_isReadyToConsumeData.getAsBoolean()) {
+          do {
+            //noinspection BusyWait
+            Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+          } while (!_shouldStop && !endCriteriaReached() && !_isReadyToConsumeData.getAsBoolean());
+        }
+
+        if (_tableConfig.getUpsertConfig() != null && _tableConfig.getUpsertConfig().isEnableSnapshot()) {
+          // Persist snapshot for sealed segments. We need to guarantee the previous segment is already replaced with
+          // the immutable segment, so the snapshot might not be persisted for the previous consuming segment.
+          List<SegmentDataManager> allSegments = _realtimeTableDataManager.acquireAllSegments();
+          List<SegmentDataManager> allSegmentsForPartition = new ArrayList<>();
+          for (SegmentDataManager segmentDataManager: allSegments) {
+            // release segments not this partition
+            if (_partitionGroupId == new LLCSegmentName(segmentDataManager.getSegmentName()).getPartitionGroupId()) {
+              allSegmentsForPartition.add(segmentDataManager);
+            } else {
+              _realtimeTableDataManager.releaseSegment(segmentDataManager);
+            }
+          }
+          // wait if all segments (except the new consuming segment) for this partition not sealed completely.
+          // The only consuming segment should be the new consuming segment, all the other segments should be persisted.
+          if (allSegmentsForPartition.stream()
+              .filter(segmentDataManager -> segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())
+              .count() > 1) {
+            do {
+              //noinspection BusyWait
+              Thread.sleep(RealtimeTableDataManager.READY_TO_CONSUME_DATA_CHECK_INTERVAL_MS);
+            } while (allSegmentsForPartition.stream()
+                .filter(segmentDataManager -> segmentDataManager.getSegment().getSegmentMetadata().isMutableSegment())
+                .count() > 1);
+          }
+          // Persist snapshot and release all immutable segments for this partition.
+          for (SegmentDataManager segmentDataManager: allSegmentsForPartition) {
+            if (segmentDataManager.getSegment() instanceof ImmutableSegment) {

Review Comment:
   We should check if it is `ImmutableSegmentImpl`. `EmptySegment` is also `ImmutableSegment`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] deemoliu commented on pull request #10826: Upsert snapshot should be taken before new a consuming segment created

Posted by "deemoliu (via GitHub)" <gi...@apache.org>.
deemoliu commented on PR #10826:
URL: https://github.com/apache/pinot/pull/10826#issuecomment-1595422864

   Discussed with @Jackie-Jiang. the fix will be addressed in partitionUpsertMetadata manager. see #10928
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org