You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/03/03 15:28:38 UTC
[pinot] 01/01: Allow file overwrite during the segment refresh (#10317)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch 0.10.0-adls-fix
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit c3d57b931cbf9fa768e7c3b16125d8e7879d05c2
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Feb 23 00:40:08 2023 -0800
Allow file overwrite during the segment refresh (#10317)
---
.../pinot/plugin/filesystem/ADLSGen2PinotFS.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 06fb122042..1b877d1002 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -545,8 +545,22 @@ public class ADLSGen2PinotFS extends BasePinotFS {
int bytesRead;
long totalBytesRead = 0;
byte[] buffer = new byte[BUFFER_SIZE];
- DataLakeFileClient fileClient =
- _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri));
+ // TODO: the newer client now has the API 'uploadFromFile' that directly takes the file as an input. We can replace
+ // this upload logic with the 'uploadFromFile'/
+ DataLakeFileClient fileClient;
+ try {
+ fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri));
+ } catch (DataLakeStorageException e) {
+ // If the path already exists, doing nothing and return true
+ if (e.getStatusCode() == ALREADY_EXISTS_STATUS_CODE && e.getErrorCode().equals(PATH_ALREADY_EXISTS_ERROR_CODE)) {
+ LOGGER.info("The destination path already exists and we are overwriting the file (dstUri={})", dstUri);
+ fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri), true);
+ } else {
+ LOGGER.error("Exception thrown while calling copy stream to destination (dstUri={}, errorStatus ={})", dstUri,
+ e.getStatusCode(), e);
+ throw new IOException(e);
+ }
+ }
// Update MD5 metadata
if (contentMd5 != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org