You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/03/03 15:28:38 UTC

[pinot] 01/01: Allow file overwrite during the segment refresh (#10317)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch 0.10.0-adls-fix
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit c3d57b931cbf9fa768e7c3b16125d8e7879d05c2
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Feb 23 00:40:08 2023 -0800

    Allow file overwrite during the segment refresh (#10317)
---
 .../pinot/plugin/filesystem/ADLSGen2PinotFS.java       | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)

diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 06fb122042..1b877d1002 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -545,8 +545,22 @@ public class ADLSGen2PinotFS extends BasePinotFS {
     int bytesRead;
     long totalBytesRead = 0;
     byte[] buffer = new byte[BUFFER_SIZE];
-    DataLakeFileClient fileClient =
-        _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri));
+    // TODO: the newer client now has the API 'uploadFromFile' that directly takes the file as an input. We can replace
+    // this upload logic with the 'uploadFromFile'/
+    DataLakeFileClient fileClient;
+    try {
+      fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri));
+    } catch (DataLakeStorageException e) {
+      // If the path already exists, doing nothing and return true
+      if (e.getStatusCode() == ALREADY_EXISTS_STATUS_CODE && e.getErrorCode().equals(PATH_ALREADY_EXISTS_ERROR_CODE)) {
+        LOGGER.info("The destination path already exists and we are overwriting the file (dstUri={})", dstUri);
+        fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri), true);
+      } else {
+        LOGGER.error("Exception thrown while calling copy stream to destination (dstUri={}, errorStatus ={})", dstUri,
+            e.getStatusCode(), e);
+        throw new IOException(e);
+      }
+    }
 
     // Update MD5 metadata
     if (contentMd5 != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org