You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/17 19:41:04 UTC
[incubator-pinot] branch master updated: Update time boundary only
when segment is available on server (#6925)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c0983f7 Update time boundary only when segment is available on server (#6925)
c0983f7 is described below
commit c0983f78f8aac1253f5fdbbcc38742fb855b0c0b
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Mon May 17 12:40:50 2021 -0700
Update time boundary only when segment is available on server (#6925)
In TimeBoundaryManager, only update the segment end time when there are ONLINE/CONSUMING instances in the external view to prevent moving the time boundary before the new segment is picked up by the servers.
This can solve the flakiness in BasicAuthRealtimeIntegrationTest
---
.../routing/timeboundary/TimeBoundaryManager.java | 13 ++++++----
.../timeboundary/TimeBoundaryManagerTest.java | 28 ++++++++++++++++++----
2 files changed, 32 insertions(+), 9 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index c31e311..e62cdb3 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -159,15 +160,19 @@ public class TimeBoundaryManager {
* ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
* <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
* ones. The refreshed segment ZK metadata change won't be picked up.
- * <p>NOTE: {@code externalView} and {@code idealState} are unused, but intentionally passed in in case they are
- * needed in the future.
+ * <p>NOTE: {@code idealState} is unused, but intentionally passed in in case it is needed in the future.
*/
@SuppressWarnings("unused")
public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
Set<String> onlineSegments) {
for (String segment : onlineSegments) {
- _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
- _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
+ // NOTE: Only update the segment end time when there are ONLINE instances in the external view to prevent moving
+ // the time boundary before the new segment is picked up by the servers
+ Map<String, String> instanceStateMap = externalView.getStateMap(segment);
+ if (instanceStateMap != null && instanceStateMap.containsValue(SegmentStateModel.ONLINE)) {
+ _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
+ _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
+ }
}
_endTimeMsMap.keySet().retainAll(onlineSegments);
updateTimeBoundaryInfo(getMaxEndTimeMs());
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index b1a5c18..8ae20b9 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.broker.routing.timeboundary;
+import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.helix.ZNRecord;
@@ -43,6 +45,9 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
+import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -101,33 +106,44 @@ public class TimeBoundaryManagerTest {
}
private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit) {
- // NOTE: External view and ideal state are not used in the current implementation.
- ExternalView externalView = Mockito.mock(ExternalView.class);
- IdealState idealState = Mockito.mock(IdealState.class);
+ ExternalView externalView = new ExternalView(tableConfig.getTableName());
+ Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
+ Map<String, String> onlineInstanceStateMap = Collections.singletonMap("server", ONLINE);
+ Map<String, String> offlineInstanceStateMap = Collections.singletonMap("server", OFFLINE);
+ Set<String> onlineSegments = new HashSet<>();
+ // NOTE: Ideal state is not used in the current implementation.
+ IdealState idealState = mock(IdealState.class);
// Start with no segment
TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
- Set<String> onlineSegments = new HashSet<>();
timeBoundaryManager.init(externalView, idealState, onlineSegments);
assertNull(timeBoundaryManager.getTimeBoundaryInfo());
// Add the first segment should update the time boundary
String segment0 = "segment0";
onlineSegments.add(segment0);
+ segmentAssignment.put(segment0, onlineInstanceStateMap);
setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
timeBoundaryManager.init(externalView, idealState, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
- // Add a new segment with larger end time should update the time boundary
+ // Add a new segment with larger end time but no ONLINE instance should not update the time boundary
String segment1 = "segment1";
onlineSegments.add(segment1);
+ segmentAssignment.put(segment1, offlineInstanceStateMap);
setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit);
timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
+ verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
+
+ // Turn the new segment ONLINE should update the time boundary
+ segmentAssignment.put(segment1, onlineInstanceStateMap);
+ timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
// Add new segment with larger end time but 0 total docs, should not update time boundary
String segmentEmpty = "segmentEmpty";
onlineSegments.add(segmentEmpty);
+ segmentAssignment.put(segmentEmpty, onlineInstanceStateMap);
setSegmentZKMetadataWithTotalDocs(rawTableName, segmentEmpty, 6, timeUnit, 0);
timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
@@ -135,12 +151,14 @@ public class TimeBoundaryManagerTest {
// Add a new segment with smaller end time should not change the time boundary
String segment2 = "segment2";
onlineSegments.add(segment2);
+ segmentAssignment.put(segment2, onlineInstanceStateMap);
setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit);
timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
// Remove the segment with largest end time should update the time boundary
onlineSegments.remove(segment1);
+ segmentAssignment.remove(segment1);
timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org