You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/02/23 08:40:15 UTC
[pinot] branch master updated: Allow file overwrite during the segment refresh (#10317)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 1fce07ed77 Allow file overwrite during the segment refresh (#10317)
1fce07ed77 is described below
commit 1fce07ed77ff31af6d3b85a6b03a4b76632ed133
Author: Seunghyun Lee <se...@startree.ai>
AuthorDate: Thu Feb 23 00:40:08 2023 -0800
Allow file overwrite during the segment refresh (#10317)
---
.../pinot/plugin/filesystem/ADLSGen2PinotFS.java | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
diff --git a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
index 8a13265d42..15977a0dd1 100644
--- a/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-adls/src/main/java/org/apache/pinot/plugin/filesystem/ADLSGen2PinotFS.java
@@ -630,8 +630,22 @@ public class ADLSGen2PinotFS extends BasePinotFS {
int bytesRead;
long totalBytesRead = 0;
byte[] buffer = new byte[BUFFER_SIZE];
- DataLakeFileClient fileClient =
- _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri));
+ // TODO: the newer client now has the API 'uploadFromFile' that directly takes the file as an input. We can replace
+ // this upload logic with the 'uploadFromFile'/
+ DataLakeFileClient fileClient;
+ try {
+ fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri));
+ } catch (DataLakeStorageException e) {
+ // If the path already exists, doing nothing and return true
+ if (e.getStatusCode() == ALREADY_EXISTS_STATUS_CODE && e.getErrorCode().equals(PATH_ALREADY_EXISTS_ERROR_CODE)) {
+ LOGGER.info("The destination path already exists and we are overwriting the file (dstUri={})", dstUri);
+ fileClient = _fileSystemClient.createFile(AzurePinotFSUtil.convertUriToUrlEncodedAzureStylePath(dstUri), true);
+ } else {
+ LOGGER.error("Exception thrown while calling copy stream to destination (dstUri={}, errorStatus ={})", dstUri,
+ e.getStatusCode(), e);
+ throw new IOException(e);
+ }
+ }
// Update MD5 metadata
if (contentMd5 != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org