You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/02/07 23:04:54 UTC
[pinot] branch master updated: add auth token for segment replace rest APIs (#8146)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 08f0f32 add auth token for segment replace rest APIs (#8146)
08f0f32 is described below
commit 08f0f32501602c3c74f8f70a124f0190423e43c0
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Feb 7 15:04:28 2022 -0800
add auth token for segment replace rest APIs (#8146)
Add auth token param for segment replace rest APIs, as done for the other rest APIs in FileUploadDownloadClient
---
.../common/utils/FileUploadDownloadClient.java | 22 ++++++++++++++++------
.../BaseMultipleSegmentsConversionExecutor.java | 7 +++----
.../minion/tasks/SegmentConversionUtils.java | 10 ++++++----
3 files changed, 25 insertions(+), 14 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 0cb747c..fff6637 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -436,17 +436,24 @@ public class FileUploadDownloadClient implements Closeable {
return requestBuilder.build();
}
- private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs) {
+ private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs,
+ @Nullable String authToken) {
RequestBuilder requestBuilder =
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1).setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE)
.setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON));
+ if (StringUtils.isNotBlank(authToken)) {
+ requestBuilder.addHeader("Authorization", authToken);
+ }
setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}
- private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs) {
+ private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) {
RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
.setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
+ if (StringUtils.isNotBlank(authToken)) {
+ requestBuilder.addHeader("Authorization", authToken);
+ }
setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}
@@ -1023,14 +1030,16 @@ public class FileUploadDownloadClient implements Closeable {
*
* @param uri URI
* @param startReplaceSegmentsRequest request
+ * @param authToken auth token
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
- public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest)
+ public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest,
+ @Nullable String authToken)
throws IOException, HttpErrorStatusException {
return sendRequest(getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest),
- DEFAULT_SOCKET_TIMEOUT_MS));
+ DEFAULT_SOCKET_TIMEOUT_MS, authToken));
}
/**
@@ -1038,13 +1047,14 @@ public class FileUploadDownloadClient implements Closeable {
*
* @param uri URI
* @oaram socketTimeoutMs Socket timeout in milliseconds
+ * @param authToken auth token
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
- public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
+ public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken)
throws IOException, HttpErrorStatusException {
- return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
+ return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken));
}
/**
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index cc9370d..8860728 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -176,7 +176,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
List<String> segmentsTo =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
- new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+ new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
}
// Upload the tarred segments
@@ -213,9 +213,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
- SegmentConversionUtils
- .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
- _minionConf.getEndReplaceSegmentsTimeoutMs());
+ SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
+ _minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
}
String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 697dedb..a032d44 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.net.URI;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
@@ -122,7 +123,7 @@ public class SegmentConversionUtils {
}
public static String startSegmentReplace(String tableNameWithType, String uploadURL,
- StartReplaceSegmentsRequest startReplaceSegmentsRequest)
+ StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -130,7 +131,8 @@ public class SegmentConversionUtils {
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri =
FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true);
- SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest);
+ SimpleHttpResponse response =
+ fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken);
String responseString = response.getResponse();
LOGGER.info(
"Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}",
@@ -140,7 +142,7 @@ public class SegmentConversionUtils {
}
public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
- int socketTimeoutMs)
+ int socketTimeoutMs, @Nullable String authToken)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -148,7 +150,7 @@ public class SegmentConversionUtils {
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri = FileUploadDownloadClient
.getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
- SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
+ SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken);
LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}",
response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org