You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/03/01 21:16:39 UTC

[incubator-pinot] branch master updated: Reduce MAX_SEGMENT_COMPLETION_TIME_MILLIS to 5 minutes (#3893)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4dc59ff  Reduce MAX_SEGMENT_COMPLETION_TIME_MILLIS to 5 minutes (#3893)
4dc59ff is described below

commit 4dc59ff95786f5f9c5e30c1b236c1282ea15efa8
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Mar 1 13:16:30 2019 -0800

    Reduce MAX_SEGMENT_COMPLETION_TIME_MILLIS to 5 minutes (#3893)
    
    Reducing MAX_SEGMENT_COMPLETION_TIME_MILLIS from 10 minutes to 5 (and possibly lower in the future). 10 minutes is a lot for segment completion. This 10 minutes gets added into our SLA of being able to fix consuming partitions, no matter how fast we can run RealtimeSegmentValidationManager.
---
 .../pinot/common/utils/helix/HelixHelper.java      | 17 ++++++
 .../common/partition/IdealStateBuilderUtil.java    |  6 ++
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 60 +++++++++++++++-----
 .../pinot/controller/helix/HelixHelperTest.java    | 43 +++++++++++---
 .../PinotLLCRealtimeSegmentManagerTest.java        | 65 ++++++++++++++++++++++
 5 files changed, 170 insertions(+), 21 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
index fdf2deb..21f2153 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java
@@ -92,6 +92,9 @@ public class HelixHelper {
           IdealState updatedIdealState;
           try {
             updatedIdealState = updater.apply(idealStateCopy);
+          } catch (PermanentUpdaterException e) {
+            LOGGER.error("Caught permanent exception while updating ideal state for resource: {}", resourceName, e);
+            throw e;
           } catch (Exception e) {
             LOGGER.error("Caught exception while updating ideal state for resource: {}", resourceName, e);
             return false;
@@ -140,6 +143,20 @@ public class HelixHelper {
     }
   }
 
+  /**
+   * Exception to be thrown by updater function to exit from retry in {@link HelixHelper::updatedIdealState}
+   */
+  public static class PermanentUpdaterException extends RuntimeException {
+
+    public PermanentUpdaterException(String message) {
+      super(message);
+    }
+
+    public PermanentUpdaterException(Throwable cause) {
+      super(cause);
+    }
+  }
+
   public static void updateIdealState(final HelixManager helixManager, final String resourceName,
       final Function<IdealState, IdealState> updater, RetryPolicy policy) {
     updateIdealState(helixManager, resourceName, updater, policy, false);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java b/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java
index c55e758..1f0ac3e 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/partition/IdealStateBuilderUtil.java
@@ -45,6 +45,12 @@ public class IdealStateBuilderUtil {
     _idealState = new IdealState(tableNameWithType);
   }
 
+  public IdealStateBuilderUtil(IdealState idealState, String tableNameWithType) {
+    _tableName = tableNameWithType;
+    _rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    _idealState = idealState;
+  }
+
   public IdealState build() {
     return _idealState;
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index e814abe..82d2e31 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -107,7 +107,7 @@ public class PinotLLCRealtimeSegmentManager {
   private static final String METADATA_TEMP_DIR_SUFFIX = ".metadata.tmp";
   private static final String METADATA_EVENT_NOTIFIER_PREFIX = "metadata.event.notifier";
 
-  // Max time to wait for all LLC segments to complete committing their metadata.
+  // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
   private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
 
   // TODO: make this configurable with default set to 10
@@ -118,7 +118,7 @@ public class PinotLLCRealtimeSegmentManager {
    * This includes any backoffs and retries for the steps 2 and 3
    * The segment will be eligible for repairs by the validation manager, if the time  exceeds this value
    */
-  private static int MAX_SEGMENT_COMPLETION_TIME_MINS = 10;
+  private static int MAX_SEGMENT_COMPLETION_TIME_MILLIS = 300_000; // 5 MINUTES
 
   private static PinotLLCRealtimeSegmentManager INSTANCE = null;
 
@@ -503,7 +503,7 @@ public class PinotLLCRealtimeSegmentManager {
     // TODO Introduce a controller failure here for integration testing
 
     // When multiple segments of the same table complete around the same time it is possible that
-    // the idealstate udpate fails due to contention. We serialize the updates to the idealstate
+    // the idealstate update fails due to contention. We serialize the updates to the idealstate
     // to reduce this contention. We may still contend with RetentionManager, or other updates
     // to idealstate from other controllers, but then we have the retry mechanism to get around that.
     // hash code can be negative, so make sure we are getting a positive lock index
@@ -514,7 +514,10 @@ public class PinotLLCRealtimeSegmentManager {
       updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentNameStr, newSegmentNameStr,
           partitionAssignment);
       LOGGER.info("Changed {} to ONLINE and created {} in CONSUMING", committingSegmentNameStr, newSegmentNameStr);
-    } finally {
+    } catch (Exception e) {
+      LOGGER.error("Caught exception when updating ideal state for {}", committingSegmentNameStr, e);
+      return false;
+    }  finally {
       lock.unlock();
     }
 
@@ -987,6 +990,18 @@ public class PinotLLCRealtimeSegmentManager {
       @Nullable
       @Override
       public IdealState apply(@Nullable IdealState idealState) {
+        // When segment completion begins, the zk metadata is updated, followed by ideal state.
+        // We allow only {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a segment to complete,
+        // after which the segment is eligible for repairs by the {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
+        // After updating metadata, if more than {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms elapse and ideal state is still not updated,
+        // the segment could have already been fixed by {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
+        // Therefore, we do not want to proceed with ideal state update if max segment completion time has exceeded
+        if (isExceededMaxSegmentCompletionTime(tableNameWithType, currentSegmentId, System.currentTimeMillis())) {
+          LOGGER.error("Exceeded max segment completion time. Skipping ideal state update for segment {}",
+              currentSegmentId);
+          throw new HelixHelper.PermanentUpdaterException(
+              "Exceeded max segment completion time for segment " + currentSegmentId);
+        }
         return updateIdealStateOnSegmentCompletion(idealState, currentSegmentId, newSegmentId, partitionAssignment);
       }
     }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f));
@@ -1052,13 +1067,29 @@ public class PinotLLCRealtimeSegmentManager {
     Stat stat = new Stat();
     LLCRealtimeSegmentZKMetadata metadata = getRealtimeSegmentZKMetadata(tableNameWithType, segmentId, stat);
     long metadataUpdateTime = stat.getMtime();
-    if (now < metadataUpdateTime + TimeUnit.MILLISECONDS.convert(MAX_SEGMENT_COMPLETION_TIME_MINS, TimeUnit.MINUTES)) {
+    if (now < metadataUpdateTime + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
       LOGGER.info("Too soon to correct segment:{} updateTime: {} now:{}", segmentId, metadataUpdateTime, now);
       return true;
     }
     return false;
   }
 
+  /**
+   *
+   * Returns true if more than {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms have elapsed since segment metadata update
+   */
+  private boolean isExceededMaxSegmentCompletionTime(String tableNameWithType, String segmentId, long now) {
+    Stat stat = new Stat();
+    LLCRealtimeSegmentZKMetadata metadata = getRealtimeSegmentZKMetadata(tableNameWithType, segmentId, stat);
+    long metadataUpdateTime = stat.getMtime();
+    if (now > metadataUpdateTime + MAX_SEGMENT_COMPLETION_TIME_MILLIS) {
+      LOGGER.info("Segment:{}, Now:{}, metadataUpdateTime:{}, Exceeded MAX_SEGMENT_COMPLETION_TIME_MILLIS:{}",
+          segmentId, now, metadataUpdateTime, MAX_SEGMENT_COMPLETION_TIME_MILLIS);
+      return true;
+    }
+    return false;
+  }
+
   private boolean isAllInstancesInState(Map<String, String> instanceStateMap, String state) {
     return instanceStateMap.values().stream().allMatch(value -> value.equals(state));
   }
@@ -1357,7 +1388,7 @@ public class PinotLLCRealtimeSegmentManager {
       @Nonnull String currentSegmentId, @Nonnull String newSegmentId,
       @Nonnull PartitionAssignment partitionAssignment) {
 
-    Map<String, List<String>> instanceAssignments = null;
+    Map<String, List<String>> instanceAssignments;
 
     RealtimeSegmentAssignmentStrategy strategy = new ConsumingSegmentAssignmentStrategy();
     try {
@@ -1375,15 +1406,16 @@ public class PinotLLCRealtimeSegmentManager {
           PinotHelixSegmentOnlineOfflineStateModelGenerator.ONLINE_STATE);
     }
 
-    // We may have (for whatever reason) a different instance list in the idealstate for the new segment.
-    // If so, clear it, and then set the instance state for the set of instances that we know should be there.
+    // The {@link RealtimeSegmentValidationManager} will fix metadata and ideal state after {@link MAX_SEGMENT_COMPLETION_TIME_MILLIS} of inactivity on the committing segment
+    // If the ideal state update during completion took longer than {@link MAX_SEGMENT_COMPLETION_TIME_MILLIS},the update could already have been done by the fixer thread .
+    // We do not want to overwrite the ideal state. It is possible that the new segment created by the fixer has already progressed to ONLINE.
+    // If we let the below update happen, we will be bringing an ONLINE segment back to CONSUMING, and end up with 2 CONSUMING segments for the partition
     Map<String, String> stateMap = idealState.getInstanceStateMap(newSegmentId);
-    if (stateMap != null) {
-      stateMap.clear();
-    }
-    for (String instance : newSegmentInstances) {
-      idealState
-          .setPartitionState(newSegmentId, instance, PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
+    if (stateMap == null) {
+      for (String instance : newSegmentInstances) {
+        idealState
+            .setPartitionState(newSegmentId, instance, PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE);
+      }
     }
 
     return idealState;
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
index 6815ee3..191241d 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/HelixHelperTest.java
@@ -35,11 +35,19 @@ import org.testng.annotations.Test;
 public class HelixHelperTest extends ControllerTest {
   public static final String RESOURCE_NAME = "potato_OFFLINE";
   public static final String INSTANCE_NAME = "Server_1.2.3.4_1234";
+  private String helixClusterName;
 
   @BeforeClass
   public void setUp() {
     startZk();
     startController();
+
+    IdealState idealState = new IdealState(RESOURCE_NAME);
+    idealState.setStateModelDefRef("OnlineOffline");
+    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
+    idealState.setReplicas("0");
+    helixClusterName = getHelixClusterName();
+    _helixAdmin.addResource(helixClusterName, RESOURCE_NAME, idealState);
   }
 
   /**
@@ -49,13 +57,6 @@ public class HelixHelperTest extends ControllerTest {
   public void testWriteLargeIdealState() {
     final int numSegments = 20000;
 
-    IdealState idealState = new IdealState(RESOURCE_NAME);
-    idealState.setStateModelDefRef("OnlineOffline");
-    idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
-    idealState.setReplicas("0");
-    String helixClusterName = getHelixClusterName();
-    _helixAdmin.addResource(helixClusterName, RESOURCE_NAME, idealState);
-
     HelixHelper.updateIdealState(_helixManager, RESOURCE_NAME, new Function<IdealState, IdealState>() {
       @Override
       public IdealState apply(@Nullable IdealState idealState) {
@@ -73,6 +74,34 @@ public class HelixHelperTest extends ControllerTest {
     }
   }
 
+  @Test
+  public void testPermanentIdealStateUpdaterException() {
+    Assert.assertTrue(catchExceptionInISUpdate(null));
+    Assert.assertFalse(catchExceptionInISUpdate("TestSegment"));
+  }
+
+  private boolean catchExceptionInISUpdate(String testSegment) {
+    boolean caughtException = false;
+    try {
+      aMethodWhichThrowsExceptionInUpdater(testSegment);
+    } catch (Exception e) {
+      caughtException = true;
+    }
+    return caughtException;
+  }
+
+  private void aMethodWhichThrowsExceptionInUpdater(String testSegment) {
+    HelixHelper.updateIdealState(_helixManager, RESOURCE_NAME, new Function<IdealState, IdealState>() {
+      @Override
+      public IdealState apply(@Nullable IdealState idealState) {
+        if (testSegment == null) {
+          throw new HelixHelper.PermanentUpdaterException("Throwing test exception for " + testSegment);
+        }
+        return idealState;
+      }
+    }, RetryPolicies.noDelayRetryPolicy(5));
+  }
+
   @AfterClass
   public void tearDown() {
     stopController();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 5cbf88a..1e68bbc 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1102,6 +1102,71 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Assert.assertFalse(status); // Committing segment metadata failed.
   }
 
+  /**
+   * Tests the scenario where ideal state for new segment, was already updated by some external thread, after step 2 and before step 3 of segment commit.
+   * This can happen if step 3 (ideal state update) took longer than {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS},
+   * making the segment eligible for repairs by {@link org.apache.pinot.controller.validation.RealtimeSegmentValidationManager}
+   * @throws InvalidConfigException
+   */
+  @Test
+  public void testIdealStateAlreadyUpdated() throws InvalidConfigException {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new FakePinotLLCRealtimeSegmentManager(null);
+    String tableNameWithType = "tableName_REALTIME";
+    String rawTableName = "tableName";
+    int nPartitions = 4;
+    int nReplicas = 2;
+    int nInstances = 3;
+    setupSegmentManager(segmentManager, tableNameWithType, nPartitions, nReplicas, nInstances);
+
+    IdealState idealState = segmentManager._tableIdealState;
+    TableConfig tableConfig = segmentManager._tableConfigStore.getTableConfig(tableNameWithType);
+    PartitionAssignment partitionAssignment =
+        segmentManager._partitionAssignmentGenerator.getStreamPartitionAssignmentFromIdealState(tableConfig,
+            idealState);
+    int partitionId = 0;
+    int seq = 0;
+    IdealStateBuilderUtil idealStateBuilderUtil = new IdealStateBuilderUtil(idealState, tableNameWithType);
+    String currentSegmentId = idealStateBuilderUtil.getSegment(partitionId, seq);
+    LLCSegmentName newSegment = new LLCSegmentName(rawTableName, partitionId, ++seq, System.currentTimeMillis());
+    String newSegmentId = newSegment.getSegmentName();
+    ZNRecordSerializer znRecordSerializer = new ZNRecordSerializer();
+    IdealState idealStateCopy =
+        new IdealState((ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
+    segmentManager.updateIdealStateOnSegmentCompletion(idealState, currentSegmentId, newSegmentId, partitionAssignment);
+
+    // check ideal state updates
+    Assert.assertNotEquals(idealState, idealStateCopy);
+
+    // progress ideal state, but send update for old. Should not update
+    currentSegmentId = newSegmentId;
+    List<String> instances = idealStateBuilderUtil.getInstances(partitionId, seq);
+    idealStateBuilderUtil.setSegmentState(partitionId, seq, "ONLINE");
+    idealStateBuilderUtil.addConsumingSegment(partitionId, ++seq, nReplicas, instances);
+    newSegmentId = idealStateBuilderUtil.getSegment(partitionId, seq);
+    idealStateBuilderUtil.setSegmentState(partitionId, seq, "ONLINE");
+    idealStateBuilderUtil.addConsumingSegment(partitionId, ++seq, nReplicas, instances);
+    idealState = idealStateBuilderUtil.build();
+    idealStateCopy =
+        new IdealState((ZNRecord) znRecordSerializer.deserialize(znRecordSerializer.serialize(idealState.getRecord())));
+
+    segmentManager.updateIdealStateOnSegmentCompletion(idealState, currentSegmentId, newSegmentId, partitionAssignment);
+
+    // check no change
+    Assert.assertEquals(idealState, idealStateCopy);
+  }
+
+  private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName, int nPartitions, int nReplicas, int nInstances)
+      throws InvalidConfigException {
+
+    List<String> instances = getInstanceList(nInstances);
+    TableConfig tableConfig = makeTableConfig(rtTableName, nReplicas, DUMMY_HOST, DEFAULT_SERVER_TENANT);
+    IdealState idealState = PinotTableIdealStateBuilder.buildEmptyRealtimeIdealStateFor(rtTableName, nReplicas, true);
+
+    segmentManager.addTableToStore(rtTableName, tableConfig, nPartitions);
+    segmentManager._partitionAssignmentGenerator.setConsumingInstances(instances);
+    segmentManager.setupNewTable(tableConfig, idealState);
+  }
+
   private void setupSegmentManager(FakePinotLLCRealtimeSegmentManager segmentManager, String rtTableName)
       throws InvalidConfigException {
     final int nInstances = 6;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org