You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/09/14 19:22:48 UTC

[GitHub] [pinot] sajjad-moradi commented on a change in pull request #6778: upload missing LLC segment to segment store by controller periodic task

sajjad-moradi commented on a change in pull request #6778:
URL: https://github.com/apache/pinot/pull/6778#discussion_r708513119



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1297,110 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   *
+   * TODO: Add an on-demand way to upload LLC segment to deep store for a specific table.
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    long retentionMs =
+        TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            .toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName);
+    for (String segmentName : segmentNames) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      try {
+        // Only fix recently created segment. Validate segment creation time based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // Only fix the committed llc segment without deep store copy
+        if (segmentZKMetadata.getStatus() != Status.DONE
+            || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped fixing LLC segment {} which is already out of retention", segmentName);
+          continue;
+        }
+        // Delay the fix to next round if not enough time elapsed since segment metadata update
+        if (!isExceededMinTimeToFixDeepStoreCopy(stat)) {
+          LOGGER.info(
+              "Delay fix for {} to next round due to not enough time elapsed since segment metadata update",

Review comment:
       Again could you add a little bit more description that the "fix" is about missing segment in deep store? 
   

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
##########
@@ -727,6 +727,23 @@ public SimpleHttpResponse uploadSegment(URI uri, String segmentName, InputStream
     return uploadSegment(uri, segmentName, inputStream, null, parameters, DEFAULT_SOCKET_TIMEOUT_MS);
   }
 
+  /**
+   * Controller periodic task uses this endpoint to ask servers to upload committed llc segment to segment store if missing.
+   * @param uri The uri to ask servers to upload segment to segment store
+   * @return the uploaded segment download url from segment store
+   * @throws URISyntaxException
+   * @throws IOException
+   * @throws HttpErrorStatusException
+   */
+  public String uploadToSegmentStore(String uri)

Review comment:
       @liuchang0520 This class is responsible for uploading/downloading files. Adding this new method to this class extends the responsibility of this class - which is against the "Single Responsibility Principle".
   I suggest we create a new class, `DeepStoreMissingSegmentCorrector` (or maybe some better name?). This new class has one public method `uploadToDeepStoreIfMissing`. Instead of adding `uploadToSegmentStore` to FileUploadDownloadClient, we can create SSLContext and httpClient in the constructor of the new class and use the http client in `uploadToDeepStoreIfMissing` method to instruct servers to upload missing segments to deep store.
   The instance for this new class can be instantiated in `BaseControllerStarter` right after instantiation of `_pinotLLCRealtimeSegmentManager`. You can pass in sslConfig subset required for creating SSLContext object as the constructor parameter. Later on in baseControllerStarter, this instance can be injected to `RealtimeSegmentValidationManager` object for triggering missing segment upload to deep store.
   With this refactoring, all the new logic will be in one place and it also doesn't add new stuff to PinotLLCRealtimeManager which is already big.
   We can chat on slack or zoom if I didn't do a good job explaining the refactoring.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1297,110 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   *
+   * TODO: Add an on-demand way to upload LLC segment to deep store for a specific table.
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    long retentionMs =
+        TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            .toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName);
+    for (String segmentName : segmentNames) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      try {
+        // Only fix recently created segment. Validate segment creation time based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // Only fix the committed llc segment without deep store copy
+        if (segmentZKMetadata.getStatus() != Status.DONE
+            || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped fixing LLC segment {} which is already out of retention", segmentName);

Review comment:
       Could you add a little bit more description that the "fix" is about missing segment in deep store? 
   

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1243,4 +1297,110 @@ private int getMaxNumPartitionsPerInstance(InstancePartitions instancePartitions
       return (numPartitions + numInstancesPerReplicaGroup - 1) / numInstancesPerReplicaGroup;
     }
   }
+
+  /**
+   * Fix the missing LLC segment in deep store by asking servers to upload, and add deep store download uri in ZK.
+   * Since uploading to deep store involves expensive compression step (first tar up the segment and then upload),
+   * we don't want to retry the uploading. Segment without deep store copy can still be downloaded from peer servers.
+   *
+   * @see <a href="
+   * https://cwiki.apache.org/confluence/display/PINOT/By-passing+deep-store+requirement+for+Realtime+segment+completion
+   * "> By-passing deep-store requirement for Realtime segment completion:Failure cases and handling</a>
+   *
+   * TODO: Add an on-demand way to upload LLC segment to deep store for a specific table.
+   */
+  public void uploadToDeepStoreIfMissing(TableConfig tableConfig) {
+    String realtimeTableName = tableConfig.getTableName();
+
+    if (_isStopping) {
+      LOGGER.info(
+          "Skipped fixing deep store copy of LLC segments for table {}, because segment manager is stopping.",
+          realtimeTableName);
+      return;
+    }
+
+    long retentionMs =
+        TimeUnit.valueOf(tableConfig.getValidationConfig().getRetentionTimeUnit().toUpperCase())
+            .toMillis(Long.parseLong(tableConfig.getValidationConfig().getRetentionTimeValue()));
+    RetentionStrategy retentionStrategy = new TimeRetentionStrategy(
+        TimeUnit.MILLISECONDS,
+        retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
+
+    // Iterate through LLC segments and upload missing deep store copy by following steps:
+    //  1. Ask servers which have online segment replica to upload to deep store.
+    //     Servers return deep store download url after successful uploading.
+    //  2. Update the LLC segment ZK metadata by adding deep store download url.
+    List<String> segmentNames = ZKMetadataProvider.getLLCRealtimeSegments(_propertyStore, realtimeTableName);
+    for (String segmentName : segmentNames) {
+      // TODO: Reevaluate the parallelism of upload operation. Currently the upload operation is conducted in
+      //  sequential order. Compared with parallel mode, it will take longer time but put less pressure on
+      //  servers. We may need to rate control the upload request if it is changed to be in parallel.
+      try {
+        // Only fix recently created segment. Validate segment creation time based on name.
+        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+        if (getCurrentTimeMs() - llcSegmentName.getCreationTimeMs() > _deepStoreLLCSegmentUploadRetryRangeMs) {
+          continue;
+        }
+
+        Stat stat = new Stat();
+        SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName, stat);
+        // Only fix the committed llc segment without deep store copy
+        if (segmentZKMetadata.getStatus() != Status.DONE
+            || !CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl())) {
+          continue;
+        }
+        // Skip the fix for the segment if it is already out of retention.
+        if (retentionStrategy.isPurgeable(realtimeTableName, segmentZKMetadata)) {
+          LOGGER.info("Skipped fixing LLC segment {} which is already out of retention", segmentName);
+          continue;
+        }
+        // Delay the fix to next round if not enough time elapsed since segment metadata update
+        if (!isExceededMinTimeToFixDeepStoreCopy(stat)) {
+          LOGGER.info(
+              "Delay fix for {} to next round due to not enough time elapsed since segment metadata update",
+              segmentName);
+          continue;
+        }
+        LOGGER.info("Fixing LLC segment {} whose deep store copy is unavailable", segmentName);
+
+        // Find servers which have online replica
+        List<URI> peerSegmentURIs = PeerServerSegmentFinder
+            .getPeerServerURIs(segmentName, CommonConstants.HTTP_PROTOCOL, _helixManager);

Review comment:
       Why not HTTPS? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org