You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/03/01 21:16:39 UTC
[incubator-pinot] branch master updated: Reduce
MAX_SEGMENT_COMPLETION_TIME_MILLIS to 5 minutes (#3893)
This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4dc59ff Reduce MAX_SEGMENT_COMPLETION_TIME_MILLIS to 5 minutes (#3893)
4dc59ff is described below
commit 4dc59ff95786f5f9c5e30c1b236c1282ea15efa8
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Mar 1 13:16:30 2019 -0800
Reduce MAX_SEGMENT_COMPLETION_TIME_MILLIS to 5 minutes (#3893)
Reducing MAX_SEGMENT_COMPLETION_TIME_MILLIS from 10 minutes to 5 (and possibly lower in the future). 10 minutes is a lot for segment completion. This 10 minutes gets added into our SLA of being able to fix consuming partitions, no matter how fast we can run RealtimeSegmentValidationManager.
---
.../pinot/common/utils/helix/HelixHelper.java | 17 ++++++
.../common/partition/IdealStateBuilderUtil.java | 6 ++
.../realtime/PinotLLCRealtimeSegmentManager.java | 60 +++++++++++++++-----
.../pinot/controller/helix/HelixHelperTest.java | 43 +++++++++++---
.../PinotLLCRealtimeSegmentManagerTest.java | 65 ++++++++++++++++++++++
5 files changed, 170 insertions(+), 21 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index fdf2deb..21f2153 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -92,6 +92,9 @@ public class HelixHelper {
IdealState updatedIdealState;
try {
updatedIdealState = updater.apply(idealStateCopy);
+ } catch (PermanentUpdaterException e) {
+ LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e);
+ throw e;
} catch (Exception e) {
LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e);
return false;
@@ -140,6 +143,20 @@ public class HelixHelper {
}
}
+ /**
+ * Exception to be thrown by updater function to exit from retry in {@link HelixHelper::updatedIdealState}
+ */
+ public static class PermanentUpdaterException extends RuntimeException {
+
+ public PermanentUpdaterException(String message) {
+ super(message);
+ }
+
+ public PermanentUpdaterException(Throwable cause) {
+ super(cause);
+ }
+ }
+
public static void updateIdealState(final HelixManager helixManager, final String resourceName,
final Function<IdealState, IdealState> updater, RetryPolicy policy) {
updateIdealState(helixManager, resourceName, updater, policy, false);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java b/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java
index c55e758..1f0ac3e 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java
@@ -45,6 +45,12 @@ public class IdealStateBuilderUtil {
_idealState = new IdealState(tableNameWithType);
}
+ public IdealStateBuilderUtil(IdealState idealState, String tableNameWithType) {
+ _tableName = tableNameWithType;
+ _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ _idealState = idealState;
+ }
+
public IdealState build() {
return _idealState;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e814abe..82d2e31 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -107,7 +107,7 @@ public class PinotLLCRealtimeSegmentManager {
private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
- // Max time to wait for all LLC segments to complete committing their metadata.
+ // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
// TODO: make this configurable with default set to 10
@@ -118,7 +118,7 @@ public class PinotLLCRealtimeSegmentManager {
* This includes any backoffs and retries for the steps 2 and 3
* The segment will be eligible for repairs by the validation manager, if the time exceeds this value
*/
- private static int MAX_SEGMENT_COMPLETION_TIME_MINS = 10;
+ private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
private static PinotLLCRealtimeSegmentManager INSTANCE = null;
@@ -503,7 +503,7 @@ public class PinotLLCRealtimeSegmentManager {
// TODO Introduce a controller failure here for integration testing
// When multiple segments of the same table complete around the same time it is possible that
- // the idealstate udpate fails due to contention. We serialize the updates to the idealstate
+ // the idealstate update fails due to contention. We serialize the updates to the idealstate
// to reduce this contention. We may still contend with RetentionManager, or other updates
// to idealstate from other controllers, but then we have the retry mechanism to get around that.
// hash code can be negative, so make sure we are getting a positive lock index
@@ -514,7 +514,10 @@ public class PinotLLCRealtimeSegmentManager {
updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentNameStr, newSegmentNameStr,
partitionAssignment);
LOGGER.info("Changed {} to ONLINE and created {} in CONSUMING", committingSegmentNameStr, newSegmentNameStr);
- } finally {
+ } catch (Exception e) {
+ LOGGER.error("Caught exception when updating ideal state for {}", committingSegmentNameStr, e);
+ return false;
+ } finally {
lock.unlock();
}
@@ -987,6 +990,18 @@ public class PinotLLCRealtimeSegmentManager {
@Nullable
@Override
public IdealState apply(@Nullable IdealState idealState) {
+ // When segment completion begins, the zk metadata is updated, followed by ideal state.
+ // We allow only {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a segment to complete,
+ // after which the segment is eligible for repairs by the {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
+ // After updating metadata, if more than {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms elapse and ideal state is still not updated,
+ // the segment could have already been fixed by {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
+ // Therefore, we do not want to proceed with ideal state update if max segment completion time has exceeded
+ if (isExceededMaxSegmentCompletionTime(tableNameWithType, currentSegmentId, System.currentTimeMillis())) {
+ LOGGER.error("Exceeded max segment completion time. Skipping ideal state update for segment {}",
+ currentSegmentId);
+ throw new HelixHelper.PermanentUpdaterException(
+ "Exceeded max segment completion time for segment " + currentSegmentId);
+ }
return updateIdealStateOnSegmentCompletion(idealState, currentSegmentId, newSegmentId, partitionAssignment);
}
}, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -1052,13 +1067,29 @@ public class PinotLLCRealtimeSegmentManager {
Stat stat = new Stat();
LLCRealtimeSegmentZKMetadata metadata = getRealtimeSegmentZKMetadata(tableNameWithType, segmentId, stat);
long metadataUpdateTime = stat.getMtime();
- if (now < metadataUpdateTime + TimeUnit.MILLISECONDS.convert(MAX_SEGMENT_COMPLETION_TIME_MINS, TimeUnit.MINUTES)) {
+ if (now < metadataUpdateTime + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
LOGGER.info("Too soon to correct segment:{} updateTime: {} now:{}", segmentId, metadataUpdateTime, now);
return true;
}
return false;
}
+ /**
+ *
+ * Returns true if more than {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms have elapsed since segment metadata update
+ */
+ private boolean isExceededMaxSegmentCompletionTime(String tableNameWithType, String segmentId, long now) {
+ Stat stat = new Stat();
+ LLCRealtimeSegmentZKMetadata metadata = getRealtimeSegmentZKMetadata(tableNameWithType, segmentId, stat);
+ long metadataUpdateTime = stat.getMtime();
+ if (now > metadataUpdateTime + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
+ LOGGER.info("Segment:{}, Now:{}, metadataUpdateTime:{}, Exceeded MAX_SEGMENT_COMPLETION_TIME_MILLIS:{}",
+ segmentId, now, metadataUpdateTime, MAX_SEGMENT_COMPLETION_TIME_MILLIS);
+ return true;
+ }
+ return false;
+ }
+
private boolean isAllInstancesInState(Map<String, String> instanceStateMap, String state) {
return instanceStateMap.values().stream().allMatch(value -> value.equals(state));
}
@@ -1357,7 +1388,7 @@ public class PinotLLCRealtimeSegmentManager {
@Nonnull String currentSegmentId, @Nonnull String newSegmentId,
@Nonnull PartitionAssignment partitionAssignment) {
- Map<String, List<String>> instanceAssignments = null;
+ Map<String, List<String>> instanceAssignments;
RealtimeSegmentAssignmentStrategy strategy = new ConsumingSegmentAssignmentStrategy();
try {
@@ -1375,15 +1406,16 @@ public class PinotLLCRealtimeSegmentManager {
PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE);
}
- // We may have (for whatever reason) a different instance list in the idealstate for the new segment.
- // If so, clear it, and then set the instance state for the set of instances that we know should be there.
+ // The {@link RealtimeSegmentValidationManager} will fix metadata and ideal state after {@link MAX_SEGMENT_COMPLETION_TIME_MILLIS} of inactivity on the committing segment
+ // If the ideal state update during completion took longer than {@link MAX_SEGMENT_COMPLETION_TIME_MILLIS},the update could already have been done by the fixer thread .
+ // We do not want to overwrite the ideal state. It is possible that the new segment created by the fixer has already progressed to ONLINE.
+ // If we let the below update happen, we will be bringing an ONLINE segment back to CONSUMING, and end up with 2 CONSUMING segments for the partition
Map<String, String> stateMap = idealState.getInstanceStateMap(newSegmentId);
- if (stateMap != null) {
- stateMap.clear();
- }
- for (String instance : newSegmentInstances) {
- idealState
- .setPartitionState(newSegmentId, instance, PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
+ if (stateMap == null) {
+ for (String instance : newSegmentInstances) {
+ idealState
+ .setPartitionState(newSegmentId, instance, PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
+ }
}
return idealState;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
index 6815ee3..191241d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
@@ -35,11 +35,19 @@ import org.testng.annotations.Test;
public class HelixHelperTest extends ControllerTest {
public static final String RESOURCE_NAME = "potato_OFFLINE";
public static final String INSTANCE_NAME = "Server_1.2.3.4_1234";
+ private String helixClusterName;
@BeforeClass
public void setUp() {
startZk();
startController();
+
+ IdealState idealState = new IdealState(RESOURCE_NAME);
+ idealState.setStateModelDefRef("OnlineOffline");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+ idealState.setReplicas("0");
+ helixClusterName = getHelixClusterName();
+ _helixAdmin.addResource(helixClusterName, RESOURCE_NAME, idealState);
}
/**
@@ -49,13 +57,6 @@ public class HelixHelperTest extends ControllerTest {
public void testWriteLargeIdealState() {
final int numSegments = 20000;
- IdealState idealState = new IdealState(RESOURCE_NAME);
- idealState.setStateModelDefRef("OnlineOffline");
- idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- idealState.setReplicas("0");
- String helixClusterName = getHelixClusterName();
- _helixAdmin.addResource(helixClusterName, RESOURCE_NAME, idealState);
-
HelixHelper.updateIdealState(_helixManager, RESOURCE_NAME, new Function<IdealState, IdealState>() {
@Override
public IdealState apply(@Nullable IdealState idealState) {
@@ -73,6 +74,34 @@ public class HelixHelperTest extends ControllerTest {
}
}
+ @Test
+ public void testPermanentIdealStateUpdaterException() {
+ Assert.assertTrue(catchExceptionInISUpdate(null));
+ Assert.assertFalse(catchExceptionInISUpdate("TestSegment"));
+ }
+
+ private boolean catchExceptionInISUpdate(String testSegment) {
+ boolean caughtException = false;
+ try {
+ aMethodWhichThrowsExceptionInUpdater(testSegment);
+ } catch (Exception e) {
+ caughtException = true;
+ }
+ return caughtException;
+ }
+
+ private void aMethodWhichThrowsExceptionInUpdater(String testSegment) {
+ HelixHelper.updateIdealState(_helixManager, RESOURCE_NAME, new Function<IdealState, IdealState>() {
+ @Override
+ public IdealState apply(@Nullable IdealState idealState) {
+ if (testSegment == null) {
+ throw new HelixHelper.PermanentUpdaterException("Throwing test exception for " + testSegment);
+ }
+ return idealState;
+ }
+ }, RetryPolicies.noDelayRetryPolicy(5));
+ }
+
@AfterClass
public void tearDown() {
stopController();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 5cbf88a..1e68bbc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1102,6 +1102,71 @@ public class PinotLLCRealtimeSegmentManagerTest {
Assert.assertFalse(status); // Committing segment metadata failed.
}
+ /**
+ * Tests the scenario where ideal state for new segment, was already updated by some external thread, after step 2 and before step 3 of segment commit.
+ * This can happen if step 3 (ideal state update) took longer than {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS},
+ * making the segment eligible for repairs by {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
+ * @throws InvalidConfigException
+ */
+ @Test
+ public void testIdealStateAlreadyUpdated() throws InvalidConfigException {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+ String tableNameWithType = "tableName_REALTIME";
+ String rawTableName = "tableName";
+ int nPartitions = 4;
+ int nReplicas = 2;
+ int nInstances = 3;
+ setupSegmentManager(segmentManager, tableNameWithType, nPartitions, nReplicas, nInstances);
+
+ IdealState idealState = segmentManager._tableIdealState;
+ TableConfig tableConfig = segmentManager._tableConfigStore.getTableConfig(tableNameWithType);
+ PartitionAssignment partitionAssignment =
+ segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
+ idealState);
+ int partitionId = 0;
+ int seq = 0;
+ IdealStateBuilderUtil idealStateBuilderUtil = new IdealStateBuilderUtil(idealState, tableNameWithType);
+ String currentSegmentId = idealStateBuilderUtil.getSegment(partitionId, seq);
+ LLCSegmentName newSegment = new LLCSegmentName(rawTableName, partitionId, ++seq, System.currentTimeMillis());
+ String newSegmentId = newSegment.getSegmentName();
+ ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
+ IdealState idealStateCopy =
+ new IdealState((ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
+ segmentManager.updateIdealStateOnSegmentCompletion(idealState, currentSegmentId, newSegmentId, partitionAssignment);
+
+ // check ideal state updates
+ Assert.assertNotEquals(idealState, idealStateCopy);
+
+ // progress ideal state, but send update for old. Should not update
+ currentSegmentId = newSegmentId;
+ List<String> instances = idealStateBuilderUtil.getInstances(partitionId, seq);
+ idealStateBuilderUtil.setSegmentState(partitionId, seq, "ONLINE");
+ idealStateBuilderUtil.addConsumingSegment(partitionId, ++seq, nReplicas, instances);
+ newSegmentId = idealStateBuilderUtil.getSegment(partitionId, seq);
+ idealStateBuilderUtil.setSegmentState(partitionId, seq, "ONLINE");
+ idealStateBuilderUtil.addConsumingSegment(partitionId, ++seq, nReplicas, instances);
+ idealState = idealStateBuilderUtil.build();
+ idealStateCopy =
+ new IdealState((ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
+
+ segmentManager.updateIdealStateOnSegmentCompletion(idealState, currentSegmentId, newSegmentId, partitionAssignment);
+
+ // check no change
+ Assert.assertEquals(idealState, idealStateCopy);
+ }
+
+ private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName, int nPartitions, int nReplicas, int nInstances)
+ throws InvalidConfigException {
+
+ List<String> instances = getInstanceList(nInstances);
+ TableConfig tableConfig = makeTableConfig(rtTableName, nReplicas, DUMMY_HOST, DEFAULT_SERVER_TENANT);
+ IdealState idealState = PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(rtTableName, nReplicas, true);
+
+ segmentManager.addTableToStore(rtTableName, tableConfig, nPartitions);
+ segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
+ segmentManager.setupNewTable(tableConfig, idealState);
+ }
+
private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName)
throws InvalidConfigException {
final int nInstances = 6;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org