You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/17 19:41:04 UTC

[incubator-pinot] branch master updated: Update time boundary only when segment is available on server (#6925)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c0983f7  Update time boundary only when segment is available on server (#6925)
c0983f7 is described below

commit c0983f78f8aac1253f5fdbbcc38742fb855b0c0b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon May 17 12:40:50 2021 -0700

    Update time boundary only when segment is available on server (#6925)
    
    In TimeBoundaryManager, only update the segment end time when there are ONLINE/CONSUMING instances in the external view to prevent moving the time boundary before the new segment is picked up by the servers.
    This can solve the flakiness in BasicAuthRealtimeIntegrationTest
---
 .../routing/timeboundary/TimeBoundaryManager.java  | 13 ++++++----
 .../timeboundary/TimeBoundaryManagerTest.java      | 28 ++++++++++++++++++----
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index c31e311..e62cdb3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -159,15 +160,19 @@ public class TimeBoundaryManager {
    * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
    * <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
    * ones. The refreshed segment ZK metadata change won't be picked up.
-   * <p>NOTE: {@code externalView} and {@code idealState} are unused, but intentionally passed in in case they are
-   * needed in the future.
+   * <p>NOTE: {@code idealState} is unused, but intentionally passed in in case it is needed in the future.
    */
   @SuppressWarnings("unused")
   public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
       Set<String> onlineSegments) {
     for (String segment : onlineSegments) {
-      _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
-          _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
+      // NOTE: Only update the segment end time when there are ONLINE instances in the external view to prevent moving
+      //       the time boundary before the new segment is picked up by the servers
+      Map<String, String> instanceStateMap = externalView.getStateMap(segment);
+      if (instanceStateMap != null && instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
+        _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
+            _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
+      }
     }
     _endTimeMsMap.keySet().retainAll(onlineSegments);
     updateTimeBoundaryInfo(getMaxEndTimeMs());
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index b1a5c18..8ae20b9 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.broker.routing.timeboundary;
 
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.ZNRecord;
@@ -43,6 +45,9 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
+import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -101,33 +106,44 @@ public class TimeBoundaryManagerTest {
   }
 
   private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit) {
-    // NOTE: External view and ideal state are not used in the current implementation.
-    ExternalView externalView = Mockito.mock(ExternalView.class);
-    IdealState idealState = Mockito.mock(IdealState.class);
+    ExternalView externalView = new ExternalView(tableConfig.getTableName());
+    Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
+    Map<String, String> onlineInstanceStateMap = Collections.singletonMap("server", ONLINE);
+    Map<String, String> offlineInstanceStateMap = Collections.singletonMap("server", OFFLINE);
+    Set<String> onlineSegments = new HashSet<>();
+    // NOTE: Ideal state is not used in the current implementation.
+    IdealState idealState = mock(IdealState.class);
 
     // Start with no segment
     TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
-    Set<String> onlineSegments = new HashSet<>();
     timeBoundaryManager.init(externalView, idealState, onlineSegments);
     assertNull(timeBoundaryManager.getTimeBoundaryInfo());
 
     // Add the first segment should update the time boundary
     String segment0 = "segment0";
     onlineSegments.add(segment0);
+    segmentAssignment.put(segment0, onlineInstanceStateMap);
     setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
     timeBoundaryManager.init(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
 
-    // Add a new segment with larger end time should update the time boundary
+    // Add a new segment with larger end time but no ONLINE instance should not update the time boundary
     String segment1 = "segment1";
     onlineSegments.add(segment1);
+    segmentAssignment.put(segment1, offlineInstanceStateMap);
     setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit);
     timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
+
+    // Turn the new segment ONLINE should update the time boundary
+    segmentAssignment.put(segment1, onlineInstanceStateMap);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
 
     // Add new segment with larger end time but 0 total docs, should not update time boundary
     String segmentEmpty = "segmentEmpty";
     onlineSegments.add(segmentEmpty);
+    segmentAssignment.put(segmentEmpty, onlineInstanceStateMap);
     setSegmentZKMetadataWithTotalDocs(rawTableName, segmentEmpty, 6, timeUnit, 0);
     timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
@@ -135,12 +151,14 @@ public class TimeBoundaryManagerTest {
     // Add a new segment with smaller end time should not change the time boundary
     String segment2 = "segment2";
     onlineSegments.add(segment2);
+    segmentAssignment.put(segment2, onlineInstanceStateMap);
     setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit);
     timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
 
     // Remove the segment with largest end time should update the time boundary
     onlineSegments.remove(segment1);
+    segmentAssignment.remove(segment1);
     timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org