You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/10/27 22:48:52 UTC
[pinot] 01/01: Add select segments API
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch add-select-segment-api
in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 243613426cbc8c02e0834f9d0c8bb12cfebf60c3
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Oct 27 15:48:20 2021 -0700
Add select segments API
---
.../api/resources/PinotSegmentRestletResource.java | 42 +++++++++++++
.../helix/core/PinotHelixResourceManager.java | 72 ++++++++++++++++++++++
2 files changed, 114 insertions(+)
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2628f01..3900f34 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@@ -53,9 +54,14 @@ import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.helix.ZNRecord;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.utils.SegmentName;
+import org.apache.pinot.common.utils.StringUtil;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.access.AccessType;
@@ -657,6 +663,42 @@ public class PinotSegmentRestletResource {
return segmentsMetadata;
}
+ @GET
+ @Path("segments/{tableName}/select")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the selected segments given the (inclusive) start and (exclusive) end timestamps"
+ + " in milliseconds. If no timestamps are provided, all the segments will be returned.",
+ notes = "Get the selected segments given the start and end timestamps in milliseconds")
+ public List<Map<TableType, List<String>>> getSelectedSegments(
+ @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+ @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+ @ApiParam(value = "Start timestamp (inclusive)") @QueryParam("startTimestamp") @DefaultValue("")
+ String startTimestampStr,
+ @ApiParam(value = "End timestamp (exclusive)") @QueryParam("endTimestamp") @DefaultValue("")
+ String endTimestampStr,
+ @ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default")
+ @QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) {
+ long startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr);
+ long endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr);
+ Preconditions.checkArgument(startTimestamp < endTimestamp,
+ "The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End "
+ + "timestamp: %d",
+ startTimestamp, endTimestamp);
+
+ List<String> tableNamesWithType = ResourceUtils
+ .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, Constants.validateTableType(tableTypeStr),
+ LOGGER);
+ List<Map<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size());
+ for (String tableNameWithType : tableNamesWithType) {
+ TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ List<String> segments =
+ _pinotHelixResourceManager
+ .getSegmentsForTableWithTimestamps(tableNameWithType, startTimestamp, endTimestamp, excludeOverlapping);
+ resultList.add(Collections.singletonMap(tableType, segments));
+ }
+ return resultList;
+ }
+
/**
* This is a helper method to get the metadata for all segments for a given table name.
* @param tableNameWithType name of the table along with its type
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8686671..d2f8633 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -593,6 +593,78 @@ public class PinotHelixResourceManager {
return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
}
+ /**
+ * Returns the segments for the given table based on the start and end timestamp.
+ *
+ * @param tableNameWithType Table name with type suffix
+ * @param startTimestamp start timestamp in milliseconds (inclusive)
+ * @param endTimestamp end timestamp in milliseconds (exclusive)
+ * @param excludeOverlapping whether to exclude the segments overlapping with the timestamps
+ */
+ public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp,
+ long endTimestamp, boolean excludeOverlapping) {
+ List<String> selectedSegments = new ArrayList<>();
+ List<String> segmentNames = getSegmentsFor(tableNameWithType);
+ // If no start and end timestamp specified, just select all the segments.
+ if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
+ selectedSegments = segmentNames;
+ } else {
+ String segmentZKMetadataPathPrefix =
+ ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/";
+ List<String> segmentZKMetadataPaths = new ArrayList<>(segmentNames.size());
+ for (String segmentName : segmentNames) {
+ segmentZKMetadataPaths.add(segmentZKMetadataPathPrefix + segmentName);
+ }
+ List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT);
+ for (int i = 0; i < znRecords.size(); i++) {
+ String segmentName = segmentNames.get(i);
+ ZNRecord znRecord = znRecords.get(i);
+ if (isSegmentWithinTimeStamps(segmentName, znRecord, startTimestamp, endTimestamp, excludeOverlapping)) {
+ selectedSegments.add(segmentName);
+ }
+ }
+ }
+ // Fetch the segment lineage metadata, and filter segments based on segment lineage.
+ ZNRecord segmentLineageZNRecord =
+ SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+ SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+ Set<String> selectedSegmentSet = new HashSet<>(selectedSegments);
+ SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, segmentLineage);
+ return new ArrayList<>(selectedSegmentSet);
+ }
+
+ /**
+ * Checks whether the segment is within the time range between the start and end timestamps.
+ * @param segmentName segment name
+ * @param znRecord the ZNRecord associated with the segment name
+ * @param startTimestamp start timestamp
+ * @param endTimestamp end timestamp
+ * @param excludeOverlapping whether to exclude the segments overlapping with the timestamps
+ */
+ private boolean isSegmentWithinTimeStamps(String segmentName, ZNRecord znRecord, long startTimestamp,
+ long endTimestamp, boolean excludeOverlapping) {
+ if (znRecord == null) {
+ return false;
+ }
+ long startTimeMsInSegment = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
+ long endTimeMsInSegment = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+ if (startTimeMsInSegment > endTimeMsInSegment) {
+ LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. End time: {}", segmentName,
+ startTimeMsInSegment, endTimeMsInSegment);
+ return false;
+ }
+ if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < endTimestamp) {
+ // The segment is within the start and end time range.
+ return true;
+ } else if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= endTimestamp) {
+ // The segment is outside of the start and end time range.
+ return false;
+ }
+ // If the segment happens to overlap with the start and end time range,
+ // check the excludeOverlapping flag to determine whether to include the segment.
+ return !excludeOverlapping;
+ }
+
@Nullable
public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String segmentName) {
return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org