You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by sn...@apache.org on 2023/02/10 08:51:21 UTC
[pinot] branch master updated: [minion] check segment existency and fail early (#10261)
This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d3c3e3fead [minion] check segment existency and fail early (#10261)
d3c3e3fead is described below
commit d3c3e3fead64a999b5560067f896e6a25f3a46d6
Author: Haitao Zhang <ha...@startree.ai>
AuthorDate: Fri Feb 10 00:51:12 2023 -0800
[minion] check segment existency and fail early (#10261)
* check segment existency and fail early
* fix compatability issue
* address comments
---
.../common/utils/FileUploadDownloadClient.java | 57 ++++++++++++++-
.../FileUploadDownloadClientWithoutServerTest.java | 41 +++++++++++
.../BaseMultipleSegmentsConversionExecutor.java | 29 +++++---
.../minion/tasks/SegmentConversionUtils.java | 31 +++++++++
.../minion/tasks/SegmentConversionUtilsTest.java | 80 ++++++++++++++++++++++
5 files changed, 227 insertions(+), 11 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
index 9a07435bf1..f78e3b4818 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
@@ -137,6 +137,33 @@ public class FileUploadDownloadClient implements AutoCloseable {
return _httpClient;
}
+ /**
+ * Extracts base URI from a URI, e.g., http://example.com:8000/a/b -> http://example.com:8000
+ * @param fullURI a full URI with
+ * @return a URI
+ * @throws URISyntaxException when there are problems generating the URI
+ */
+ public static URI extractBaseURI(URI fullURI)
+ throws URISyntaxException {
+ return getURI(fullURI.getScheme(), fullURI.getHost(), fullURI.getPort());
+ }
+
+ /**
+ * Generates a URI from the given protocol, host and port
+ * @param protocol the protocol part of the URI
+ * @param host the host part of the URI
+ * @param port the port part of the URI
+ * @return a URI
+ * @throws URISyntaxException when there are problems generating the URIg
+ */
+ public static URI getURI(String protocol, String host, int port)
+ throws URISyntaxException {
+ if (!SUPPORTED_PROTOCOLS.contains(protocol)) {
+ throw new IllegalArgumentException(String.format("Unsupported protocol '%s'", protocol));
+ }
+ return new URI(protocol, null, host, port, null, null, null);
+ }
+
public static URI getURI(String protocol, String host, int port, String path)
throws URISyntaxException {
if (!SUPPORTED_PROTOCOLS.contains(protocol)) {
@@ -783,9 +810,32 @@ public class FileUploadDownloadClient implements AutoCloseable {
/**
* Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME)
* If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map.
+ * @param controllerBaseUri the base controller URI, e.g., https://example.com:8000
+ * @param rawTableName the raw table name without table type
+ * @param tableType the table type (OFFLINE or REALTIME)
+ * @param excludeReplacedSegments whether to exclude replaced segments (determined by segment lineage)
+ * @return a map from a given tableType to a list of segment names
+ * @throws Exception when failed to get segments from the controller
+ */
+ public Map<String, List<String>> getSegments(URI controllerBaseUri, String rawTableName,
+ @Nullable TableType tableType, boolean excludeReplacedSegments)
+ throws Exception {
+ return getSegments(controllerBaseUri, rawTableName, tableType, excludeReplacedSegments, null);
+ }
+
+ /**
+ * Returns a map from a given tableType to a list of segments for that given tableType (OFFLINE or REALTIME)
+ * If tableType is left unspecified, both OFFLINE and REALTIME segments will be returned in the map.
+ * @param controllerBaseUri the base controller URI, e.g., https://example.com:8000
+ * @param rawTableName the raw table name without table type
+ * @param tableType the table type (OFFLINE or REALTIME)
+ * @param excludeReplacedSegments whether to exclude replaced segments (determined by segment lineage)
+ * @param authProvider the {@link AuthProvider}
+ * @return a map from a given tableType to a list of segment names
+ * @throws Exception when failed to get segments from the controller
*/
- public Map<String, List<String>> getSegments(URI controllerUri, String rawTableName, @Nullable TableType tableType,
- boolean excludeReplacedSegments)
+ public Map<String, List<String>> getSegments(URI controllerBaseUri, String rawTableName,
+ @Nullable TableType tableType, boolean excludeReplacedSegments, @Nullable AuthProvider authProvider)
throws Exception {
List<String> tableTypes;
if (tableType == null) {
@@ -794,13 +844,14 @@ public class FileUploadDownloadClient implements AutoCloseable {
tableTypes = Arrays.asList(tableType.toString());
}
ControllerRequestURLBuilder controllerRequestURLBuilder =
- ControllerRequestURLBuilder.baseUrl(controllerUri.toString());
+ ControllerRequestURLBuilder.baseUrl(controllerBaseUri.toString());
Map<String, List<String>> tableTypeToSegments = new HashMap<>();
for (String tableTypeToFilter : tableTypes) {
tableTypeToSegments.put(tableTypeToFilter, new ArrayList<>());
String uri =
controllerRequestURLBuilder.forSegmentListAPI(rawTableName, tableTypeToFilter, excludeReplacedSegments);
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1);
+ AuthProviderUtils.toRequestHeaders(authProvider).forEach(requestBuilder::addHeader);
HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS);
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(() -> {
try {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientWithoutServerTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientWithoutServerTest.java
new file mode 100644
index 0000000000..34426e9574
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientWithoutServerTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.utils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class FileUploadDownloadClientWithoutServerTest {
+ @Test
+ public void testExtractBaseURI()
+ throws URISyntaxException {
+ Assert.assertEquals(FileUploadDownloadClient.extractBaseURI(new URI("http://example.com:8000/a/b?c=d")),
+ new URI("http://example.com:8000"));
+ }
+
+ @Test
+ public void testGetURI()
+ throws URISyntaxException {
+ Assert.assertEquals(FileUploadDownloadClient.getURI("http", "example.com", 8000),
+ new URI("http://example.com:8000"));
+ }
+}
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index c5b5f2b27a..a3dfbc5e93 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -25,10 +25,13 @@ import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
@@ -154,25 +157,35 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
@Override
public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig)
throws Exception {
- preProcess(pinotTaskConfig);
- _pinotTaskConfig = pinotTaskConfig;
- _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
- String taskType = pinotTaskConfig.getTaskType();
+ // check whether all segments to process exist in the table, if not, terminate early to avoid wasting computing
+ // resources
Map<String, String> configs = pinotTaskConfig.getConfigs();
String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY);
String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY);
- String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
- String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY);
AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN));
+ Set<String> segmentNamesForTable = SegmentConversionUtils.getSegmentNamesForTable(tableNameWithType,
+ FileUploadDownloadClient.extractBaseURI(new URI(uploadURL)), authProvider);
+ Set<String> nonExistingSegmentNames =
+ new HashSet<>(Arrays.asList(inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR)));
+ nonExistingSegmentNames.removeAll(segmentNamesForTable);
+ if (!CollectionUtils.isEmpty(nonExistingSegmentNames)) {
+ throw new RuntimeException(String.format("table: %s does have the following segments to process: %s",
+ tableNameWithType, nonExistingSegmentNames));
+ }
+
+ preProcess(pinotTaskConfig);
+ _pinotTaskConfig = pinotTaskConfig;
+ _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId());
+ String taskType = pinotTaskConfig.getTaskType();
+ String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY);
+ String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR);
LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType,
tableNameWithType, inputSegmentNames, downloadURLString, uploadURL);
-
File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID());
Preconditions.checkState(tempDataDir.mkdirs());
String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
-
try {
List<File> inputSegmentDirs = new ArrayList<>();
for (int i = 0; i < downloadURLs.length; i++) {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
index ad2199bb60..1198044fb4 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtils.java
@@ -21,10 +21,14 @@ package org.apache.pinot.plugin.minion.tasks;
import com.google.common.net.InetAddresses;
import java.io.File;
import java.net.URI;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpStatus;
@@ -61,6 +65,33 @@ public class SegmentConversionUtils {
private SegmentConversionUtils() {
}
+ /**
+ * Gets segment names for the given table
+ * @param tableNameWithType a table name with type
+ * @param controllerBaseURI the controller base URI
+ * @param authProvider a {@link AuthProvider}
+ * @return a set of segment names
+ * @throws Exception when there are exceptions getting segment names for the given table
+ */
+ public static Set<String> getSegmentNamesForTable(String tableNameWithType, URI controllerBaseURI,
+ @Nullable AuthProvider authProvider)
+ throws Exception {
+ String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ SSLContext sslContext = MinionContext.getInstance().getSSLContext();
+ try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
+ Map<String, List<String>> tableTypeToSegmentNames =
+ fileUploadDownloadClient.getSegments(controllerBaseURI, rawTableName, tableType, true, authProvider);
+ if (tableTypeToSegmentNames != null && !tableTypeToSegmentNames.isEmpty()) {
+ List<String> allSegmentNameList = tableTypeToSegmentNames.get(tableType.toString());
+ if (!CollectionUtils.isEmpty(allSegmentNameList)) {
+ return new HashSet<>(allSegmentNameList);
+ }
+ }
+ return Collections.emptySet();
+ }
+ }
+
public static void uploadSegment(Map<String, String> configs, List<Header> httpHeaders,
List<NameValuePair> parameters, String tableNameWithType, String segmentName, String uploadURL, File fileToUpload)
throws Exception {
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtilsTest.java
new file mode 100644
index 0000000000..bac8486e0c
--- /dev/null
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/SegmentConversionUtilsTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.minion.tasks;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.sun.net.httpserver.HttpServer;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.util.Random;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class SegmentConversionUtilsTest {
+ private static final String TEST_TABLE_WITHOUT_TYPE = "myTable";
+ private static final String TEST_TABLE_TYPE = "REALTIME";
+
+ private static final String TEST_TABLE_SEGMENT_1 = "myTable_REALTIME_segment_1";
+
+ private static final String TEST_TABLE_SEGMENT_2 = "myTable_REALTIME_segment_2";
+ private static final String TEST_TABLE_SEGMENT_3 = "myTable_REALTIME_segment_3";
+ private static final String TEST_SCHEME = "http";
+ private static final String TEST_HOST = "localhost";
+ private static final int TEST_PORT = new Random().nextInt(10000) + 10000;
+ private HttpServer _testServer;
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ _testServer = HttpServer.create(new InetSocketAddress(TEST_PORT), 0);
+ _testServer.createContext("/segments/myTable", exchange -> {
+ String response = JsonUtils.objectToString(
+ ImmutableList.of(
+ ImmutableMap.of(TEST_TABLE_TYPE, ImmutableList.of(TEST_TABLE_SEGMENT_1, TEST_TABLE_SEGMENT_2))));
+ exchange.sendResponseHeaders(HttpStatus.SC_OK, response.length());
+ OutputStream os = exchange.getResponseBody();
+ os.write(response.getBytes());
+ os.close();
+ });
+ _testServer.setExecutor(null); // creates a default executor
+ _testServer.start();
+ }
+
+ @Test
+ public void testNonExistentSegments()
+ throws Exception {
+ Assert.assertEquals(
+ SegmentConversionUtils.getSegmentNamesForTable(TEST_TABLE_WITHOUT_TYPE + "_" + TEST_TABLE_TYPE,
+ FileUploadDownloadClient.getURI(TEST_SCHEME, TEST_HOST, TEST_PORT), null),
+ ImmutableSet.of(TEST_TABLE_SEGMENT_1, TEST_TABLE_SEGMENT_2));
+ }
+
+ @AfterClass
+ public void shutDown() {
+ _testServer.stop(0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org