You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2023/05/18 00:20:36 UTC
[pinot] branch master updated: Minor Pinot Deepstore Upload Retry Task Improvements (#10752)
This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 61d6b1075d Minor Pinot Deepstore Upload Retry Task Improvements (#10752)
61d6b1075d is described below
commit 61d6b1075da2ae73d817c3154f942af2d45021b6
Author: Ankit Sultana <an...@uber.com>
AuthorDate: Thu May 18 05:50:30 2023 +0530
Minor Pinot Deepstore Upload Retry Task Improvements (#10752)
* Use Non-Random Path in Pinot Deepstore Upload Retry
* Add configurable timeout + fix segment name in move
* Fix test
* Proper test which asserts segment file move
* Address feedback
---
.../pinot/common/utils/SimpleHttpErrorInfo.java | 2 ++
.../apache/pinot/controller/ControllerConf.java | 6 ++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 34 ++++++++++++++++++----
.../PinotLLCRealtimeSegmentManagerTest.java | 31 +++++++++++++++-----
.../manager/realtime/PinotFSSegmentUploader.java | 8 ++++-
.../data/manager/realtime/SegmentUploader.java | 11 ++++++-
.../realtime/Server2ControllerSegmentUploader.java | 13 +++++++--
.../pinot/server/api/resources/TablesResource.java | 20 +++++++++++--
8 files changed, 105 insertions(+), 20 deletions(-)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java
index 90597453d5..b424ada19f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SimpleHttpErrorInfo.java
@@ -19,6 +19,7 @@
package org.apache.pinot.common.utils;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -30,6 +31,7 @@ public class SimpleHttpErrorInfo {
private String _error;
@JsonCreator
+ @JsonIgnoreProperties(ignoreUnknown = true)
public SimpleHttpErrorInfo(@JsonProperty("code") int code, @JsonProperty("error") String message) {
_code = code;
_error = message;
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 2dda29a5b2..8595a434af 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -213,6 +213,8 @@ public class ControllerConf extends PinotConfiguration {
// Default value is false.
public static final String ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT =
"controller.realtime.segment.deepStoreUploadRetryEnabled";
+ public static final String DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS =
+ "controller.realtime.segment.deepStoreUploadRetry.timeoutMs";
public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
@@ -920,6 +922,10 @@ public class ControllerConf extends PinotConfiguration {
return getProperty(ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, false);
}
+ public int getDeepStoreRetryUploadTimeoutMs() {
+ return getProperty(ControllerPeriodicTasksConf.DEEP_STORE_RETRY_UPLOAD_TIMEOUT_MS, -1);
+ }
+
public long getPinotTaskManagerInitialDelaySeconds() {
return getPeriodicTaskInitialDelayInSeconds();
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 455ff15894..fab1a234b0 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -167,6 +167,7 @@ public class PinotLLCRealtimeSegmentManager {
private final Lock[] _idealStateUpdateLocks;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
+ private final int _deepstoreUploadRetryTimeoutMs;
private final FileUploadDownloadClient _fileUploadDownloadClient;
private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
@@ -191,6 +192,7 @@ public class PinotLLCRealtimeSegmentManager {
}
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
_isDeepStoreLLCSegmentUploadRetryEnabled = controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
+ _deepstoreUploadRetryTimeoutMs = controllerConf.getDeepStoreRetryUploadTimeoutMs();
_fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? initFileUploadDownloadClient() : null;
}
@@ -481,12 +483,10 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("No moving needed for segment on peer servers: {}", segmentLocation);
return;
}
- URI segmentFileURI = URIUtils.getUri(segmentLocation);
+
URI tableDirURI = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName);
- URI uriToMoveTo = URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
PinotFS pinotFS = PinotFSFactory.create(tableDirURI.getScheme());
- Preconditions.checkState(pinotFS.move(segmentFileURI, uriToMoveTo, true),
- "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uriToMoveTo);
+ String uriToMoveTo = moveSegmentFile(rawTableName, segmentName, segmentLocation, pinotFS);
// Cleans up tmp segment files under table dir.
// We only clean up tmp segment files in table level dir, so there's no need to list recursively.
@@ -502,7 +502,7 @@ public class PinotLLCRealtimeSegmentManager {
} catch (Exception e) {
LOGGER.warn("Caught exception while deleting temporary segment files for segment: {}", segmentName, e);
}
- committingSegmentDescriptor.setSegmentLocation(uriToMoveTo.toString());
+ committingSegmentDescriptor.setSegmentLocation(uriToMoveTo);
}
/**
@@ -1392,6 +1392,7 @@ public class PinotLLCRealtimeSegmentManager {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
+ String rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
// Use this retention value to avoid the data racing between segment upload and retention management.
RetentionStrategy retentionStrategy = null;
@@ -1404,6 +1405,8 @@ public class PinotLLCRealtimeSegmentManager {
retentionMs - MIN_TIME_BEFORE_SEGMENT_EXPIRATION_FOR_FIXING_DEEP_STORE_COPY_MILLIS);
}
+ PinotFS pinotFS = PinotFSFactory.create(URIUtils.getUri(_controllerConf.getDataDir()).getScheme());
+
// Iterate through LLC segments and upload missing deep store copy by following steps:
// 1. Ask servers which have online segment replica to upload to deep store.
// Servers return deep store download url after successful uploading.
@@ -1438,9 +1441,13 @@ public class PinotLLCRealtimeSegmentManager {
// Randomly ask one server to upload
URI uri = peerSegmentURIs.get(RANDOM.nextInt(peerSegmentURIs.size()));
String serverUploadRequestUrl = StringUtil.join("/", uri.toString(), "upload");
+ serverUploadRequestUrl =
+ String.format("%s?uploadTimeoutMs=%d", serverUploadRequestUrl, _deepstoreUploadRetryTimeoutMs);
LOGGER.info("Ask server to upload LLC segment {} to deep store by this path: {}", segmentName,
serverUploadRequestUrl);
- String segmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+ String tempSegmentDownloadUrl = _fileUploadDownloadClient.uploadToSegmentStore(serverUploadRequestUrl);
+ String segmentDownloadUrl =
+ moveSegmentFile(rawTableName, segmentName, tempSegmentDownloadUrl, pinotFS);
LOGGER.info("Updating segment {} download url in ZK to be {}", segmentName, segmentDownloadUrl);
// Update segment ZK metadata by adding the download URL
segmentZKMetadata.setDownloadUrl(segmentDownloadUrl);
@@ -1555,4 +1562,19 @@ public class PinotLLCRealtimeSegmentManager {
Set<String> consumingSegments = findConsumingSegments(idealState);
return new PauseStatus(Boolean.parseBoolean(isTablePausedStr), consumingSegments, null);
}
+
+ @VisibleForTesting
+ String moveSegmentFile(String rawTableName, String segmentName, String segmentLocation, PinotFS pinotFS)
+ throws IOException {
+ URI segmentFileURI = URIUtils.getUri(segmentLocation);
+ URI uriToMoveTo = createSegmentPath(rawTableName, segmentName);
+ Preconditions.checkState(pinotFS.move(segmentFileURI, uriToMoveTo, true),
+ "Failed to move segment file for segment: %s from: %s to: %s", segmentName, segmentLocation, uriToMoveTo);
+ return uriToMoveTo.toString();
+ }
+
+ @VisibleForTesting
+ URI createSegmentPath(String rawTableName, String segmentName) {
+ return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, URIUtils.encode(segmentName));
+ }
}
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index eee1f2a8a8..2223c9e341 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -89,7 +90,12 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
public class PinotLLCRealtimeSegmentManagerTest {
@@ -948,6 +954,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
ControllerConf controllerConfig = new ControllerConf();
controllerConfig.setProperty(
ControllerConf.ControllerPeriodicTasksConf.ENABLE_DEEP_STORE_RETRY_UPLOAD_LLC_SEGMENT, true);
+ controllerConfig.setDataDir(TEMP_DIR.toString());
FakePinotLLCRealtimeSegmentManager segmentManager =
new FakePinotLLCRealtimeSegmentManager(pinotHelixResourceManager, controllerConfig);
Assert.assertTrue(segmentManager.isDeepStoreLLCSegmentUploadRetryEnabled());
@@ -990,11 +997,17 @@ public class PinotLLCRealtimeSegmentManagerTest {
"segments",
REALTIME_TABLE_NAME,
segmentsZKMetadata.get(0).getSegmentName(),
- "upload");
- String segmentDownloadUrl0 = String.format("segmentDownloadUr_%s", segmentsZKMetadata.get(0)
- .getSegmentName());
+ "upload") + "?uploadTimeoutMs=-1";
+ // tempSegmentFileLocation is the location where the segment uploader will upload the segment. This usually ends
+ // with a random UUID
+ File tempSegmentFileLocation = new File(TEMP_DIR, segmentsZKMetadata.get(0).getSegmentName() + UUID.randomUUID());
+ FileUtils.write(tempSegmentFileLocation, "test");
+ // After the deep-store retry task gets the segment location returned by Pinot server, it will move the segment to
+ // its final location. This is the expected segment location.
+ String expectedSegmentLocation = segmentManager.createSegmentPath(RAW_TABLE_NAME,
+ segmentsZKMetadata.get(0).getSegmentName()).toString();
when(segmentManager._mockedFileUploadDownloadClient
- .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(segmentDownloadUrl0);
+ .uploadToSegmentStore(serverUploadRequestUrl0)).thenReturn(tempSegmentFileLocation.getPath());
// Change 2nd segment status to be DONE, but with default peer download url.
// Verify later the download url isn't fixed after upload failure.
@@ -1013,7 +1026,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
"segments",
REALTIME_TABLE_NAME,
segmentsZKMetadata.get(1).getSegmentName(),
- "upload");
+ "upload") + "?uploadTimeoutMs=-1";
when(segmentManager._mockedFileUploadDownloadClient
.uploadToSegmentStore(serverUploadRequestUrl1))
.thenThrow(new HttpErrorStatusException(
@@ -1041,11 +1054,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
when(pinotHelixResourceManager.getTableConfig(REALTIME_TABLE_NAME))
.thenReturn(segmentManager._tableConfig);
+
// Verify the result
segmentManager.uploadToDeepStoreIfMissing(segmentManager._tableConfig, segmentsZKMetadata);
assertEquals(
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(0), null).getDownloadUrl(),
- segmentDownloadUrl0);
+ expectedSegmentLocation);
+ assertFalse(tempSegmentFileLocation.exists(),
+ "Deep-store retry task should move the file from temp location to permanent location");
+
assertEquals(
segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentNames.get(1), null).getDownloadUrl(),
CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index 9a34872a50..4f173521c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -59,7 +59,13 @@ public class PinotFSSegmentUploader implements SegmentUploader {
_serverMetrics = serverMetrics;
}
+ @Override
public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
+ return uploadSegment(segmentFile, segmentName, _timeoutInMs);
+ }
+
+ @Override
+ public URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis) {
if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
LOGGER.error("Missing segment store uri. Failed to upload segment file {} for {}.", segmentFile.getName(),
segmentName.getSegmentName());
@@ -89,7 +95,7 @@ public class PinotFSSegmentUploader implements SegmentUploader {
};
Future<URI> future = _executorService.submit(uploadTask);
try {
- URI segmentLocation = future.get(_timeoutInMs, TimeUnit.MILLISECONDS);
+ URI segmentLocation = future.get(timeoutInMillis, TimeUnit.MILLISECONDS);
LOGGER.info("Successfully upload segment {} to {}.", segmentName, segmentLocation);
_serverMetrics.addMeteredTableValue(rawTableName,
segmentLocation == null ? ServerMeter.SEGMENT_UPLOAD_FAILURE : ServerMeter.SEGMENT_UPLOAD_SUCCESS, 1);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
index 44d0a36067..63bbe99a5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentUploader.java
@@ -24,6 +24,15 @@ import org.apache.pinot.common.utils.LLCSegmentName;
public interface SegmentUploader {
- // Returns the URI of the uploaded segment. null if the upload fails.
+
+ /**
+ * Uploads the given segmentFile to the deep-store. Returns the URI where the segment is uploaded.
+ */
URI uploadSegment(File segmentFile, LLCSegmentName segmentName);
+
+ /**
+ * Uploads the given segmentFile to the deep-store. Returns the URI where the segment is uploaded. The upload will
+ * wait for the specified timeout.
+ */
+ URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
index 5aa5b8b266..4b00563125 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/Server2ControllerSegmentUploader.java
@@ -64,7 +64,12 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
@Override
public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
- SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile);
+ return uploadSegment(segmentFile, segmentName, _segmentUploadRequestTimeoutMs);
+ }
+
+ @Override
+ public URI uploadSegment(File segmentFile, LLCSegmentName segmentName, int timeoutInMillis) {
+ SegmentCompletionProtocol.Response response = uploadSegmentToController(segmentFile, timeoutInMillis);
if (response.getStatus() == SegmentCompletionProtocol.ControllerResponseStatus.UPLOAD_SUCCESS) {
try {
URI uri = new URI(response.getSegmentLocation());
@@ -79,12 +84,16 @@ public class Server2ControllerSegmentUploader implements SegmentUploader {
}
public SegmentCompletionProtocol.Response uploadSegmentToController(File segmentFile) {
+ return uploadSegmentToController(segmentFile, _segmentUploadRequestTimeoutMs);
+ }
+
+ private SegmentCompletionProtocol.Response uploadSegmentToController(File segmentFile, int timeoutInMillis) {
SegmentCompletionProtocol.Response response;
long startTime = System.currentTimeMillis();
try {
String responseStr = _fileUploadDownloadClient
.uploadSegment(_controllerSegmentUploadCommitUrl, _segmentName, segmentFile,
- AuthProviderUtils.toRequestHeaders(_authProvider), null, _segmentUploadRequestTimeoutMs).getResponse();
+ AuthProviderUtils.toRequestHeaders(_authProvider), null, timeoutInMillis).getResponse();
response = SegmentCompletionProtocol.Response.fromJsonString(responseStr);
_segmentLogger.info("Controller response {} for {}", response.toJsonString(), _controllerSegmentUploadCommitUrl);
if (response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER)) {
diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index d9f0b6c76e..e518d2d6b1 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -469,7 +469,14 @@ public class TablesResource {
* when segment store copy is unavailable for committed low level consumer segments.
* Please note that invocation of this endpoint may cause query performance to suffer, since we tar up the segment
* to upload it.
- * @see <a>href="https://tinyurl.com/f63ru4sb</a>
+ *
+ * @see <a href="https://tinyurl.com/f63ru4sb></a>
+ * @param realtimeTableName table name with type.
+ * @param segmentName name of the segment to be uploaded
+ * @param timeoutMs timeout for the segment upload to the deep-store. If this is negative, the default timeout
+ * would be used.
+ * @return full url where the segment is uploaded
+ * @throws Exception if an error occurred during the segment upload.
*/
@POST
@Path("/segments/{realtimeTableName}/{segmentName}/upload")
@@ -485,7 +492,8 @@ public class TablesResource {
public String uploadLLCSegment(
@ApiParam(value = "Name of the REALTIME table", required = true) @PathParam("realtimeTableName")
String realtimeTableName,
- @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName)
+ @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") String segmentName,
+ @QueryParam("uploadTimeoutMs") @DefaultValue("-1") int timeoutMs)
throws Exception {
LOGGER.info("Received a request to upload low level consumer segment {} for table {}", segmentName,
realtimeTableName);
@@ -527,7 +535,13 @@ public class TablesResource {
// Use segment uploader to upload the segment tar file to segment store and return the segment download url.
SegmentUploader segmentUploader = _serverInstance.getInstanceDataManager().getSegmentUploader();
- URI segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName));
+ URI segmentDownloadUrl;
+ if (timeoutMs <= 0) {
+ // Use default timeout if passed timeout is not positive
+ segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName));
+ } else {
+ segmentDownloadUrl = segmentUploader.uploadSegment(segmentTarFile, new LLCSegmentName(segmentName), timeoutMs);
+ }
if (segmentDownloadUrl == null) {
throw new WebApplicationException(
String.format("Failed to upload table %s segment %s to segment store", realtimeTableName, segmentName),
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org