You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2022/02/11 02:33:49 UTC

[pinot] branch master updated: Enhance revertReplaceSegments api so reverting entry1 in the following example is not allowed: (#8166)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fb572bd  Enhance revertReplaceSegments api so reverting entry1 in the following example is not allowed: (#8166)
fb572bd is described below

commit fb572bd0aba20d2b8a83320df6dd24cb0c654b30
Author: Jiapeng Tao <ji...@linkedin.com>
AuthorDate: Thu Feb 10 18:33:21 2022 -0800

    Enhance revertReplaceSegments api so reverting entry1 in the following example is not allowed: (#8166)
    
    entry1: {Seg_0 -> Seg1, COMPLETED}
    entry2: {Seg_1 -> Seg2, COMPLETED/IN_PROGRESS}
---
 .../helix/core/PinotHelixResourceManager.java       | 21 ++++++++++++++++++++-
 .../helix/core/PinotHelixResourceManagerTest.java   | 18 ++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 938422c..38d47d5 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2871,7 +2871,7 @@ public class PinotHelixResourceManager {
           //    at any time in case of REFRESH use case.
           if (forceCleanup) {
             if (lineageEntry.getState() == LineageEntryState.IN_PROGRESS && CollectionUtils
-              .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) {
+                .isEqualCollection(segmentsFrom, lineageEntry.getSegmentsFrom())) {
               LOGGER.info(
                   "Detected the incomplete lineage entry with the same 'segmentsFrom'. Reverting the lineage "
                       + "entry to unblock the new segment protocol. tableNameWithType={}, entryId={}, segmentsFrom={}, "
@@ -3078,6 +3078,25 @@ public class PinotHelixResourceManager {
           throw new RuntimeException(errorMsg);
         }
 
+        // We do not allow to revert the lineage entry which segments in 'segmentsTo' appear in 'segmentsFrom' of other
+        // 'IN_PROGRESS' or 'COMPLETED' entries. E.g. we do not allow reverting entry1 because it will block reverting
+        // entry2.
+        // entry1: {(Seg_0, Seg_1, Seg_2) -> (Seg_3, Seg_4, Seg_5), COMPLETED}
+        // entry2: {(Seg_3, Seg_4, Seg_5) -> (Seg_6, Seg_7, Seg_8), IN_PROGRESS/COMPLETED}
+        // TODO: need to expand the logic to revert multiple entries in one go when we support > 2 data snapshots
+        for (String currentEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry currentLineageEntry = segmentLineage.getLineageEntry(currentEntryId);
+          if (currentLineageEntry.getState() == LineageEntryState.IN_PROGRESS
+              || currentLineageEntry.getState() == LineageEntryState.COMPLETED) {
+            Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), currentLineageEntry
+                .getSegmentsFrom()), String.format("Cannot revert lineage entry, found segments from 'segmentsTo' "
+                    + "appear in 'segmentsFrom' of another lineage entry. (tableNameWithType='%s', "
+                    + "segmentLineageEntryId='%s', segmentsTo = '%s', segmentLineageEntryId='%s' "
+                    + "segmentsFrom = '%s')", tableNameWithType, segmentLineageEntryId, lineageEntry.getSegmentsTo(),
+                currentEntryId, currentLineageEntry.getSegmentsFrom()));
+          }
+        }
+
         // Update segment lineage entry to 'REVERTED'
         updateSegmentLineageEntryToReverted(tableNameWithType, segmentLineage, segmentLineageEntryId, lineageEntry);
 
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index f41f26e..dac5447 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -75,6 +75,7 @@ import static org.apache.pinot.spi.utils.CommonConstants.Helix.LEAD_CONTROLLER_R
 import static org.apache.pinot.spi.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE;
 import static org.apache.pinot.spi.utils.CommonConstants.Helix.UNTAGGED_SERVER_INSTANCE;
 import static org.apache.pinot.spi.utils.CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+import static org.testng.Assert.fail;
 
 
 public class PinotHelixResourceManagerTest {
@@ -491,6 +492,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -499,6 +501,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -508,6 +511,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -515,6 +519,7 @@ public class PinotHelixResourceManagerTest {
     // Invalid table
     try {
       ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_TABLE_NAME, lineageEntryId);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -522,6 +527,7 @@ public class PinotHelixResourceManagerTest {
     // Invalid lineage entry id
     try {
       ControllerTestUtils.getHelixResourceManager().endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, "aaa");
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -530,6 +536,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -575,6 +582,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, lineageEntryId2, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -613,6 +621,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .startReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_TABLE_NAME, segmentsFrom, segmentsTo, false);
+      fail();
     } catch (Exception e) {
       // expected
     }
@@ -755,6 +764,14 @@ public class PinotHelixResourceManagerTest {
     Assert.assertEquals(new HashSet<>(ControllerTestUtils.getHelixResourceManager()
             .getSegmentsFor(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, true)),
         new HashSet<>(Arrays.asList("s3", "s4", "s5")));
+    // Try to revert the first entry should fail
+    try {
+      ControllerTestUtils.getHelixResourceManager()
+          .revertReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, lineageEntryId, false);
+      fail();
+    } catch (Exception e) {
+      // expected
+    }
 
     // Add partial segments to indicate incomplete protocol
     ControllerTestUtils.getHelixResourceManager().addNewSegment(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME,
@@ -799,6 +816,7 @@ public class PinotHelixResourceManagerTest {
     try {
       ControllerTestUtils.getHelixResourceManager()
           .endReplaceSegments(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME, lineageEntryId2);
+      fail();
     } catch (Exception e) {
       // expected
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org