You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/01/03 21:34:21 UTC

[GitHub] [pinot] jackjlli commented on a change in pull request #7969: extend SegmentDirectoryLoader interface and refactor BaseTableDataManager

jackjlli commented on a change in pull request #7969:
URL: https://github.com/apache/pinot/pull/7969#discussion_r777714621



##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -333,92 +315,131 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
 
   @Override
   public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
-      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata segmentMetadata)
       throws Exception {
-    if (!isNewSegment(zkMetadata, localMetadata)) {
-      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
+    // Non-null segment metadata means the segment has already been loaded.
+    if (segmentMetadata != null) {
+      if (hasSameCRC(zkMetadata, segmentMetadata)) {
+        // Simply returns if the CRC hasn't changed. The table config may have changed
+        // since segment is loaded, but that is handled by reloadSegment() method.
+        LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc());
+      } else {
+        // Download the raw segment, reprocess and load it if the CRC has changed.
+        LOGGER.info("Segment: {} of table: {} already loaded but its crc: {} differs from new crc: {}", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc(), zkMetadata.getCrc());
+        downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+            ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType));
+      }
       return;
     }
 
-    // Try to recover if no local metadata is provided.
-    if (localMetadata == null) {
-      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
-      localMetadata = recoverSegmentQuietly(segmentName);
-      if (!isNewSegment(zkMetadata, localMetadata)) {
-        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
-            localMetadata.getCrc());
-        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
-          return;
-        }
-        // Set local metadata to null to indicate that the local segment fails to load,
-        // although it exists and has same crc with the remote one.
-        localMetadata = null;
-      }
+    // For local tier backend, try to recover the segment from potential
+    // reload failure. Continue upon any failure.
+    File indexDir = getSegmentDataDir(segmentName);
+    recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+    // Creates the SegmentDirectory object to access the segment metadata that
+    // may be from local tier backend or remote tier backend.
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
+        SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    SegmentDirectory segmentDirectory = null;
+    try {
+      segmentDirectory = segmentLoader.load(indexDir.toURI(), segmentLoaderContext);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to create SegmentDirectory for segment: {} of table: {} due to error: {}", segmentName,
+          _tableNameWithType, e.getMessage());
     }
 
-    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
-        segmentName, _tableNameWithType);
-
-    // Download segment and replace the local one, either due to failure to recover local segment,
-    // or the segment data is updated and has new CRC now.
-    if (localMetadata == null) {
-      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
-    } else {
-      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
-          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    // Download the raw segment, reprocess and load it if it is never loaded or its CRC has changed.
+    if (segmentDirectory == null || !hasSameCRC(zkMetadata, segmentDirectory.getSegmentMetadata())) {
+      LOGGER.info("Segment: {} of table: {} not exist or its crc: {} differs from new crc: {}", segmentName,
+          _tableNameWithType, (segmentDirectory == null) ? "none" : segmentDirectory.getSegmentMetadata().getCrc(),
+          zkMetadata.getCrc());
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+          ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType));
+      return;
     }
-    File indexDir = downloadSegment(segmentName, zkMetadata);
-    addSegment(indexDir, indexLoadingConfig);
-    LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType,
-        zkMetadata.getCrc());
-  }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
-    return true;
+    try {
+      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
+      if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema)) {
+        // The loaded segment is still consistent with current table config or schema.
+        LOGGER.info("Segment: {} of table: {} is consistent with table config and schema", segmentName,
+            _tableNameWithType);
+        addSegment(ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig, schema));
+        return;
+      }
+      // If any discrepancy is found, get the segment from tier backend, reprocess and load it.
+      // Please note that the segment is from tier backed, not deep store, for incremental processing.

Review comment:
       `s/backed/backend/`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -275,53 +278,32 @@ public void addSegmentError(String segmentName, SegmentErrorInfo segmentErrorInf
 
   @Override
   public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingConfig, SegmentZKMetadata zkMetadata,
-      SegmentMetadata localMetadata, @Nullable Schema schema, boolean forceDownload)
+      SegmentMetadata segmentMetadata, @Nullable Schema schema, boolean forceDownload)
       throws Exception {
-    File indexDir = localMetadata.getIndexDir();
-    Preconditions.checkState(indexDir.isDirectory(), "Index directory: %s is not a directory", indexDir);
-
-    File parentFile = indexDir.getParentFile();
-    File segmentBackupDir =
-        new File(parentFile, indexDir.getName() + CommonConstants.Segment.SEGMENT_BACKUP_DIR_SUFFIX);
-
+    File indexDir = getSegmentDataDir(segmentName);
+    // Create backup dir to make segment reloading atomic for local tier backend.
+    LoaderUtils.createBackup(indexDir);
     try {
-      // First rename index directory to segment backup directory so that original segment have all file descriptors
-      // point to the segment backup directory to ensure original segment serves queries properly
-
-      // Rename index directory to segment backup directory (atomic)
-      Preconditions.checkState(indexDir.renameTo(segmentBackupDir),
-          "Failed to rename index directory: %s to segment backup directory: %s", indexDir, segmentBackupDir);
-
-      // Download from remote or copy from local backup directory into index directory,
-      // and then continue to load the segment from index directory.
-      boolean shouldDownload = forceDownload || !hasSameCRC(zkMetadata, localMetadata);
-      if (shouldDownload && allowDownload(segmentName, zkMetadata)) {
-        if (forceDownload) {
-          LOGGER.info("Segment: {} of table: {} is forced to download", segmentName, _tableNameWithType);
-        } else {
-          LOGGER.info("Download segment:{} of table: {} as local crc: {} mismatches remote crc: {}", segmentName,
-              _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
-        }
-        indexDir = downloadSegment(segmentName, zkMetadata);
+      boolean shouldDownloadRawSegment = forceDownload || !hasSameCRC(zkMetadata, segmentMetadata);
+      if (shouldDownloadRawSegment && allowDownloadRawSegment(segmentName, zkMetadata)) {
+        downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata, schema);
       } else {
-        LOGGER.info("Reload the local copy of segment: {} of table: {}", segmentName, _tableNameWithType);
-        FileUtils.copyDirectory(segmentBackupDir, indexDir);
+        SegmentDirectoryLoaderContext segmentLoaderContext =
+            new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getInstanceId(),
+                segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+        SegmentDirectoryLoader segmentLoader =
+            SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+        // Get the segment from tier backend, reprocess and load it with current table config and schema.
+        // Please note that the segment is from tier backed, not deep store, for incremental processing.

Review comment:
       `s/backed/backend/`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
##########
@@ -333,92 +315,131 @@ public void reloadSegment(String segmentName, IndexLoadingConfig indexLoadingCon
 
   @Override
   public void addOrReplaceSegment(String segmentName, IndexLoadingConfig indexLoadingConfig,
-      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata localMetadata)
+      SegmentZKMetadata zkMetadata, @Nullable SegmentMetadata segmentMetadata)
       throws Exception {
-    if (!isNewSegment(zkMetadata, localMetadata)) {
-      LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
+    // Non-null segment metadata means the segment has already been loaded.
+    if (segmentMetadata != null) {
+      if (hasSameCRC(zkMetadata, segmentMetadata)) {
+        // Simply returns if the CRC hasn't changed. The table config may have changed
+        // since segment is loaded, but that is handled by reloadSegment() method.
+        LOGGER.info("Segment: {} of table: {} has crc: {} same as before, already loaded, do nothing", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc());
+      } else {
+        // Download the raw segment, reprocess and load it if the CRC has changed.
+        LOGGER.info("Segment: {} of table: {} already loaded but its crc: {} differs from new crc: {}", segmentName,
+            _tableNameWithType, segmentMetadata.getCrc(), zkMetadata.getCrc());
+        downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+            ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType));
+      }
       return;
     }
 
-    // Try to recover if no local metadata is provided.
-    if (localMetadata == null) {
-      LOGGER.info("Segment: {} of table: {} is not loaded, checking disk", segmentName, _tableNameWithType);
-      localMetadata = recoverSegmentQuietly(segmentName);
-      if (!isNewSegment(zkMetadata, localMetadata)) {
-        LOGGER.info("Segment: {} of table {} has crc: {} same as before, loading", segmentName, _tableNameWithType,
-            localMetadata.getCrc());
-        if (loadSegmentQuietly(segmentName, indexLoadingConfig)) {
-          return;
-        }
-        // Set local metadata to null to indicate that the local segment fails to load,
-        // although it exists and has same crc with the remote one.
-        localMetadata = null;
-      }
+    // For local tier backend, try to recover the segment from potential
+    // reload failure. Continue upon any failure.
+    File indexDir = getSegmentDataDir(segmentName);
+    recoverReloadFailureQuietly(_tableNameWithType, segmentName, indexDir);
+
+    // Creates the SegmentDirectory object to access the segment metadata that
+    // may be from local tier backend or remote tier backend.
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
+        SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    SegmentDirectory segmentDirectory = null;
+    try {
+      segmentDirectory = segmentLoader.load(indexDir.toURI(), segmentLoaderContext);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to create SegmentDirectory for segment: {} of table: {} due to error: {}", segmentName,
+          _tableNameWithType, e.getMessage());
     }
 
-    Preconditions.checkState(allowDownload(segmentName, zkMetadata), "Segment: %s of table: %s does not allow download",
-        segmentName, _tableNameWithType);
-
-    // Download segment and replace the local one, either due to failure to recover local segment,
-    // or the segment data is updated and has new CRC now.
-    if (localMetadata == null) {
-      LOGGER.info("Download segment: {} of table: {} as no good one exists locally", segmentName, _tableNameWithType);
-    } else {
-      LOGGER.info("Download segment: {} of table: {} as local crc: {} mismatches remote crc: {}.", segmentName,
-          _tableNameWithType, localMetadata.getCrc(), zkMetadata.getCrc());
+    // Download the raw segment, reprocess and load it if it is never loaded or its CRC has changed.
+    if (segmentDirectory == null || !hasSameCRC(zkMetadata, segmentDirectory.getSegmentMetadata())) {
+      LOGGER.info("Segment: {} of table: {} not exist or its crc: {} differs from new crc: {}", segmentName,
+          _tableNameWithType, (segmentDirectory == null) ? "none" : segmentDirectory.getSegmentMetadata().getCrc(),
+          zkMetadata.getCrc());
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+          ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType));
+      return;
     }
-    File indexDir = downloadSegment(segmentName, zkMetadata);
-    addSegment(indexDir, indexLoadingConfig);
-    LOGGER.info("Downloaded and loaded segment: {} of table: {} with crc: {}", segmentName, _tableNameWithType,
-        zkMetadata.getCrc());
-  }
 
-  protected boolean allowDownload(String segmentName, SegmentZKMetadata zkMetadata) {
-    return true;
+    try {
+      Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
+      if (!ImmutableSegmentLoader.needPreprocess(segmentDirectory, indexLoadingConfig, schema)) {
+        // The loaded segment is still consistent with current table config or schema.
+        LOGGER.info("Segment: {} of table: {} is consistent with table config and schema", segmentName,
+            _tableNameWithType);
+        addSegment(ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig, schema));
+        return;
+      }
+      // If any discrepancy is found, get the segment from tier backend, reprocess and load it.
+      // Please note that the segment is from tier backed, not deep store, for incremental processing.
+      LOGGER.info("Segment: {} of table: {} needs reprocess with table config and schema", segmentName,
+          _tableNameWithType);
+      // Close the stale SegmentDirectory object before loading the newly processed segment.
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      segmentLoader.download(indexDir, segmentLoaderContext);
+      processAndLoadSegment(segmentName, indexDir, indexLoadingConfig, schema);
+      LOGGER.info("Segment: {} of table: {} is reprocessed and loaded", segmentName, _tableNameWithType);
+    } catch (Exception e) {
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      LOGGER.error("Failed to reprocess and load segment: {} of table: {}", segmentName, _tableNameWithType, e);
+      downloadRawSegmentAndProcess(segmentName, indexLoadingConfig, zkMetadata,
+          ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType));
+    }
   }
 
-  protected File downloadSegment(String segmentName, SegmentZKMetadata zkMetadata)
+  /**
+   * This method downloads the raw segment from the deep store and process it, mainly
+   * for cases where segment CRC has changed or the existing segment fails to load.
+   */
+  private void downloadRawSegmentAndProcess(String segmentName, IndexLoadingConfig indexLoadingConfig,
+      SegmentZKMetadata zkMetadata, @Nullable Schema schema)
       throws Exception {
-    // TODO: may support download from peer servers for RealTime table.
-    return downloadSegmentFromDeepStore(segmentName, zkMetadata);
+    Preconditions.checkState(allowDownloadRawSegment(segmentName, zkMetadata),
+        "Segment: %s of table: %s does not allow download raw segment", segmentName, _tableNameWithType);
+
+    File indexDir = downloadRawSegment(segmentName, zkMetadata);
+    processAndLoadSegment(segmentName, indexDir, indexLoadingConfig, schema);
+    LOGGER.info("Downloaded raw segment: {} of table: {} with crc: {} and loaded it", segmentName,
+        _tableNameWithType, zkMetadata.getCrc());
   }
 
-  /**
-   * Server restart during segment reload might leave segment directory in inconsistent state, like the index
-   * directory might not exist but segment backup directory existed. This method tries to recover from reload
-   * failure before checking the existence of the index directory and loading segment metadata from it.
-   */
-  private SegmentMetadata recoverSegmentQuietly(String segmentName) {
-    File indexDir = getSegmentDataDir(segmentName);
+  private void processAndLoadSegment(String segmentName, File indexDir, IndexLoadingConfig indexLoadingConfig,
+      @Nullable Schema schema)
+      throws Exception {
+    // Preprocess the segment locally with current table config and schema.
+    ImmutableSegmentLoader.preprocess(indexDir, indexLoadingConfig, schema);
+
+    SegmentDirectoryLoaderContext segmentLoaderContext =
+        new SegmentDirectoryLoaderContext(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getInstanceId(),
+            segmentName, indexLoadingConfig.getSegmentDirectoryConfigs());
+    SegmentDirectoryLoader segmentLoader =
+        SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader(indexLoadingConfig.getSegmentDirectoryLoader());
+    SegmentDirectory segmentDirectory = null;
     try {
-      LoaderUtils.reloadFailureRecovery(indexDir);
-      if (!indexDir.exists()) {
-        LOGGER.info("Segment: {} of table: {} is not found on disk", segmentName, _tableNameWithType);
-        return null;
-      }
-      SegmentMetadataImpl localMetadata = new SegmentMetadataImpl(indexDir);
-      LOGGER.info("Segment: {} of table: {} with crc: {} from disk is ready for loading", segmentName,
-          _tableNameWithType, localMetadata.getCrc());
-      return localMetadata;
+      // Upload the processed segment to server tier backend, which can be local or remote.
+      segmentLoader.upload(indexDir, segmentLoaderContext);
+      // Create the SegmentDirectory object with the newly processed segment from tier backend.
+      segmentDirectory = segmentLoader.load(indexDir.toURI(), segmentLoaderContext);
+      addSegment(ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig, schema));
     } catch (Exception e) {
-      LOGGER.error("Failed to recover segment: {} of table: {} from disk", segmentName, _tableNameWithType, e);
-      FileUtils.deleteQuietly(indexDir);
-      return null;
+      closeSegmentDirectoryQuietly(segmentDirectory);
+      LOGGER.warn("Failed to load newly processed segment: {} of table: {}", segmentName, _tableNameWithType, e);

Review comment:
       Why not use `error` level here since an exception gets thrown?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org