You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2022/02/07 23:04:54 UTC

[pinot] branch master updated: add auth token for segment replace rest APIs (#8146)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 08f0f32  add auth token for segment replace rest APIs (#8146)
08f0f32 is described below

commit 08f0f32501602c3c74f8f70a124f0190423e43c0
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Mon Feb 7 15:04:28 2022 -0800

    add auth token for segment replace rest APIs (#8146)
    
    Add auth token param for segment replace rest APIs, as done for the other rest APIs in FileUploadDownloadClient
---
 .../common/utils/FileUploadDownloadClient.java     | 22 ++++++++++++++++------
 .../BaseMultipleSegmentsConversionExecutor.java    |  7 +++----
 .../minion/tasks/SegmentConversionUtils.java       | 10 ++++++----
 3 files changed, 25 insertions(+), 14 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 0cb747c..fff6637 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -436,17 +436,24 @@ public class FileUploadDownloadClient implements Closeable {
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs) {
+  private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs,
+      @Nullable String authToken) {
     RequestBuilder requestBuilder =
         RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1).setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE)
             .setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON));
+    if (StringUtils.isNotBlank(authToken)) {
+      requestBuilder.addHeader("Authorization", authToken);
+    }
     setTimeout(requestBuilder, socketTimeoutMs);
     return requestBuilder.build();
   }
 
-  private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs) {
+  private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) {
     RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
         .setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
+    if (StringUtils.isNotBlank(authToken)) {
+      requestBuilder.addHeader("Authorization", authToken);
+    }
     setTimeout(requestBuilder, socketTimeoutMs);
     return requestBuilder.build();
   }
@@ -1023,14 +1030,16 @@ public class FileUploadDownloadClient implements Closeable {
    *
    * @param uri URI
    * @param startReplaceSegmentsRequest request
+   * @param authToken auth token
    * @return Response
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest)
+  public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest,
+      @Nullable String authToken)
       throws IOException, HttpErrorStatusException {
     return sendRequest(getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest),
-        DEFAULT_SOCKET_TIMEOUT_MS));
+        DEFAULT_SOCKET_TIMEOUT_MS, authToken));
   }
 
   /**
@@ -1038,13 +1047,14 @@ public class FileUploadDownloadClient implements Closeable {
    *
    * @param uri URI
    * @oaram socketTimeoutMs Socket timeout in milliseconds
+   * @param authToken auth token
    * @return Response
    * @throws IOException
    * @throws HttpErrorStatusException
    */
-  public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
+  public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken)
       throws IOException, HttpErrorStatusException {
-    return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
+    return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken));
   }
 
   /**
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index cc9370d..8860728 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -176,7 +176,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         List<String> segmentsTo =
             segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
         lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
-            new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+            new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
       }
 
       // Upload the tarred segments
@@ -213,9 +213,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
       // Update the segment lineage to indicate that the segment replacement is done.
       if (replaceSegmentsEnabled) {
-        SegmentConversionUtils
-            .endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
-                _minionConf.getEndReplaceSegmentsTimeoutMs());
+        SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
+            _minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
       }
 
       String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index 697dedb..a032d44 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.net.URI;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
 import org.apache.http.Header;
 import org.apache.http.HttpHeaders;
@@ -122,7 +123,7 @@ public class SegmentConversionUtils {
   }
 
   public static String startSegmentReplace(String tableNameWithType, String uploadURL,
-      StartReplaceSegmentsRequest startReplaceSegmentsRequest)
+      StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken)
       throws Exception {
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -130,7 +131,8 @@ public class SegmentConversionUtils {
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
       URI uri =
           FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true);
-      SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest);
+      SimpleHttpResponse response =
+          fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken);
       String responseString = response.getResponse();
       LOGGER.info(
           "Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}",
@@ -140,7 +142,7 @@ public class SegmentConversionUtils {
   }
 
   public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
-      int socketTimeoutMs)
+      int socketTimeoutMs, @Nullable String authToken)
       throws Exception {
     String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
@@ -148,7 +150,7 @@ public class SegmentConversionUtils {
     try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
       URI uri = FileUploadDownloadClient
           .getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
-      SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
+      SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken);
       LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}",
           response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL);
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org