You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/02/10 08:51:21 UTC

[pinot] branch master updated: [minion] check segment existency and fail early (#10261)

This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c3e3fead [minion] check segment existency and fail early (#10261)
d3c3e3fead is described below

commit d3c3e3fead64a999b5560067f896e6a25f3a46d6
Author: Haitao Zhang <ha...@startree.ai>
AuthorDate: Fri Feb 10 00:51:12 2023 -0800

    [minion] check segment existency and fail early (#10261)
    
    * check segment existency and fail early
    
    * fix compatability issue
    
    * address comments
---
 .../common/utils/FileUploadDownloadClient.java     | 57 ++++++++++++++-
 .../FileUploadDownloadClientWithoutServerTest.java | 41 +++++++++++
 .../BaseMultipleSegmentsConversionExecutor.java    | 29 +++++---
 .../minion/tasks/SegmentConversionUtils.java       | 31 +++++++++
 .../minion/tasks/SegmentConversionUtilsTest.java   | 80 ++++++++++++++++++++++
 5 files changed, 227 insertions(+), 11 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 9a07435bf1..f78e3b4818 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -137,6 +137,33 @@ public class FileUploadDownloadClient implements AutoCloseable {
     return _httpClient;
   }
 
+  /**
+   * Extracts base URI from a URI, e.g., http://example.com:8000/a/b -> http://example.com:8000
+   * @param fullURI a full URI with
+   * @return a URI
+   * @throws URISyntaxException when there are problems generating the URI
+   */
+  public static URI extractBaseURI(URI fullURI)
+      throws URISyntaxException {
+    return getURI(fullURI.getScheme(), fullURI.getHost(), fullURI.getPort());
+  }
+
+  /**
+   * Generates a URI from the given protocol, host and port
+   * @param protocol the protocol part of the URI
+   * @param host the host part of the URI
+   * @param port the port part of the URI
+   * @return a URI
+   * @throws URISyntaxException when there are problems generating the URIg
+   */
+  public static URI getURI(String protocol, String host, int port)
+      throws URISyntaxException {
+    if (!SUPPORTED_PROTOCOLS.contains(protocol)) {
+      throw new IllegalArgumentException(String.format("Unsupported protocol '%s'", protocol));
+    }
+    return new URI(protocol, null, host, port, null, null, null);
+  }
+
   public static URI getURI(String protocol, String host, int port, String path)
       throws URISyntaxException {
     if (!SUPPORTED_PROTOCOLS.contains(protocol)) {
@@ -783,9 +810,32 @@ public class FileUploadDownloadClient implements AutoCloseable {
   /**
    * Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME)
    * If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map.
+   * @param controllerBaseUri the base controller URI, e.g., https://example.com:8000
+   * @param rawTableName the raw table name without table type
+   * @param tableType the table type (OFFLINE or REALTIME)
+   * @param excludeReplacedSegments whether to exclude replaced segments (determined by segment lineage)
+   * @return a map from a given tableType to a list of segment names
+   * @throws Exception when failed to get segments from the controller
+   */
+  public Map<String, List<String>> getSegments(URI controllerBaseUri, String rawTableName,
+      @Nullable TableType tableType, boolean excludeReplacedSegments)
+      throws Exception {
+    return getSegments(controllerBaseUri, rawTableName, tableType, excludeReplacedSegments, null);
+  }
+
+  /**
+   * Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME)
+   * If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map.
+   * @param controllerBaseUri the base controller URI, e.g., https://example.com:8000
+   * @param rawTableName the raw table name without table type
+   * @param tableType the table type (OFFLINE or REALTIME)
+   * @param excludeReplacedSegments whether to exclude replaced segments (determined by segment lineage)
+   * @param authProvider the {@link AuthProvider}
+   * @return a map from a given tableType to a list of segment names
+   * @throws Exception when failed to get segments from the controller
    */
-  public Map<String, List<String>> getSegments(URI controllerUri, String rawTableName, @Nullable TableType tableType,
-      boolean excludeReplacedSegments)
+  public Map<String, List<String>> getSegments(URI controllerBaseUri, String rawTableName,
+      @Nullable TableType tableType, boolean excludeReplacedSegments, @Nullable AuthProvider authProvider)
       throws Exception {
     List<String> tableTypes;
     if (tableType == null) {
@@ -794,13 +844,14 @@ public class FileUploadDownloadClient implements AutoCloseable {
       tableTypes = Arrays.asList(tableType.toString());
     }
     ControllerRequestURLBuilder controllerRequestURLBuilder =
-        ControllerRequestURLBuilder.baseUrl(controllerUri.toString());
+        ControllerRequestURLBuilder.baseUrl(controllerBaseUri.toString());
     Map<String, List<String>> tableTypeToSegments = new HashMap<>();
     for (String tableTypeToFilter : tableTypes) {
       tableTypeToSegments.put(tableTypeToFilter, new ArrayList<>());
       String uri =
           controllerRequestURLBuilder.forSegmentListAPI(rawTableName, tableTypeToFilter, excludeReplacedSegments);
       RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
+      AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
       HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
       RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(() -> {
         try {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientWithoutServerTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientWithoutServerTest.java
new file mode 100644
index 0000000000..34426e9574
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientWithoutServerTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FileUploadDownloadClientWithoutServerTest {
+  @Test
+  public void testExtractBaseURI()
+      throws URISyntaxException {
+    Assert.assertEquals(FileUploadDownloadClient.extractBaseURI(new URI("http://example.com:8000/a/b?c=d")),
+        new URI("http://example.com:8000"));
+  }
+
+  @Test
+  public void testGetURI()
+      throws URISyntaxException {
+    Assert.assertEquals(FileUploadDownloadClient.getURI("http", "example.com", 8000),
+        new URI("http://example.com:8000"));
+  }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c5b5f2b27a..a3dfbc5e93 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -25,10 +25,13 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.http.Header;
@@ -154,25 +157,35 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
   @Override
   public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
       throws Exception {
-    preProcess(pinotTaskConfig);
-    _pinotTaskConfig = pinotTaskConfig;
-    _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
-    String taskType = pinotTaskConfig.getTaskType();
+    // check whether all segments to process exist in the table, if not, terminate early to avoid wasting computing
+    // resources
     Map<String, String> configs = pinotTaskConfig.getConfigs();
     String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
     String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
-    String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
-    String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
     String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
     AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+    Set<String> segmentNamesForTable = SegmentConversionUtils.getSegmentNamesForTable(tableNameWithType,
+        FileUploadDownloadClient.extractBaseURI(new URI(uploadURL)), authProvider);
+    Set<String> nonExistingSegmentNames =
+        new HashSet<>(Arrays.asList(inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR)));
+    nonExistingSegmentNames.removeAll(segmentNamesForTable);
+    if (!CollectionUtils.isEmpty(nonExistingSegmentNames)) {
+      throw new RuntimeException(String.format("table: %s does have the following segments to process: %s",
+          tableNameWithType, nonExistingSegmentNames));
+    }
+
+    preProcess(pinotTaskConfig);
 
+    _pinotTaskConfig = pinotTaskConfig;
+    _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
+    String taskType = pinotTaskConfig.getTaskType();
+    String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
+    String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
     LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
         tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
-
     File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
     Preconditions.checkState(tempDataDir.mkdirs());
     String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
-
     try {
       List<File> inputSegmentDirs = new ArrayList<>();
       for (int i = 0; i < downloadURLs.length; i++) {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index ad2199bb60..1198044fb4 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -21,10 +21,14 @@ package org.apache.pinot.plugin.minion.tasks;
 import com.google.common.net.InetAddresses;
 import java.io.File;
 import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import javax.net.ssl.SSLContext;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.http.Header;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpStatus;
@@ -61,6 +65,33 @@ public class SegmentConversionUtils {
   private SegmentConversionUtils() {
   }
 
+  /**
+   * Gets segment names for the given table
+   * @param tableNameWithType a table name with type
+   * @param controllerBaseURI the controller base URI
+   * @param authProvider a {@link AuthProvider}
+   * @return a set of segment names
+   * @throws Exception when there are exceptions getting segment names for the given table
+   */
+  public static Set<String> getSegmentNamesForTable(String tableNameWithType, URI controllerBaseURI,
+      @Nullable AuthProvider authProvider)
+      throws Exception {
+    String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+    TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    SSLContext sslContext = MinionContext.getInstance().getSSLContext();
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
+      Map<String, List<String>> tableTypeToSegmentNames =
+          fileUploadDownloadClient.getSegments(controllerBaseURI, rawTableName, tableType, true, authProvider);
+      if (tableTypeToSegmentNames != null && !tableTypeToSegmentNames.isEmpty()) {
+        List<String> allSegmentNameList = tableTypeToSegmentNames.get(tableType.toString());
+        if (!CollectionUtils.isEmpty(allSegmentNameList)) {
+          return new HashSet<>(allSegmentNameList);
+        }
+      }
+      return Collections.emptySet();
+    }
+  }
+
   public static void uploadSegment(Map<String, String> configs, List<Header> httpHeaders,
       List<NameValuePair> parameters, String tableNameWithType, String segmentName, String uploadURL, File fileToUpload)
       throws Exception {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtilsTest.java
new file mode 100644
index 0000000000..bac8486e0c
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtilsTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.sun.net.httpserver.HttpServer;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Random;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class SegmentConversionUtilsTest {
+  private static final String TEST_TABLE_WITHOUT_TYPE = "myTable";
+  private static final String TEST_TABLE_TYPE = "REALTIME";
+
+  private static final String TEST_TABLE_SEGMENT_1 = "myTable_REALTIME_segment_1";
+
+  private static final String TEST_TABLE_SEGMENT_2 = "myTable_REALTIME_segment_2";
+  private static final String TEST_TABLE_SEGMENT_3 = "myTable_REALTIME_segment_3";
+  private static final String TEST_SCHEME = "http";
+  private static final String TEST_HOST = "localhost";
+  private static final int TEST_PORT = new Random().nextInt(10000) + 10000;
+  private HttpServer _testServer;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    _testServer = HttpServer.create(new InetSocketAddress(TEST_PORT), 0);
+    _testServer.createContext("/segments/myTable", exchange -> {
+      String response = JsonUtils.objectToString(
+          ImmutableList.of(
+              ImmutableMap.of(TEST_TABLE_TYPE, ImmutableList.of(TEST_TABLE_SEGMENT_1, TEST_TABLE_SEGMENT_2))));
+      exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length());
+      OutputStream os = exchange.getResponseBody();
+      os.write(response.getBytes());
+      os.close();
+    });
+    _testServer.setExecutor(null); // creates a default executor
+    _testServer.start();
+  }
+
+  @Test
+  public void testNonExistentSegments()
+      throws Exception {
+    Assert.assertEquals(
+        SegmentConversionUtils.getSegmentNamesForTable(TEST_TABLE_WITHOUT_TYPE + "_" + TEST_TABLE_TYPE,
+        FileUploadDownloadClient.getURI(TEST_SCHEME, TEST_HOST, TEST_PORT), null),
+        ImmutableSet.of(TEST_TABLE_SEGMENT_1, TEST_TABLE_SEGMENT_2));
+  }
+
+  @AfterClass
+  public void shutDown() {
+    _testServer.stop(0);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org