You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2021/10/27 22:48:52 UTC

[pinot] 01/01: Add select segments API

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch add-select-segment-api
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 243613426cbc8c02e0834f9d0c8bb12cfebf60c3
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Oct 27 15:48:20 2021 -0700

    Add select segments API
---
 .../api/resources/PinotSegmentRestletResource.java | 42 +++++++++++++
 .../helix/core/PinotHelixResourceManager.java      | 72 ++++++++++++++++++++++
 2 files changed, 114 insertions(+)

diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 2628f01..3900f34 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -21,6 +21,7 @@ package org.apache.pinot.controller.api.resources;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -53,9 +54,14 @@ import org.apache.commons.httpclient.HttpConnectionManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.lineage.LineageEntry;
+import org.apache.pinot.common.lineage.LineageEntryState;
+import org.apache.pinot.common.lineage.SegmentLineage;
+import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.utils.SegmentName;
+import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.access.AccessType;
@@ -657,6 +663,42 @@ public class PinotSegmentRestletResource {
     return segmentsMetadata;
   }
 
+  @GET
+  @Path("segments/{tableName}/select")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get the selected segments given the (inclusive) start and (exclusive) end timestamps"
+      + " in milliseconds. If no timestamps are provided, all the segments will be returned.",
+      notes = "Get the selected segments given the start and end timestamps in milliseconds")
+  public List<Map<TableType, List<String>>> getSelectedSegments(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Start timestamp (inclusive)") @QueryParam("startTimestamp") @DefaultValue("")
+          String startTimestampStr,
+      @ApiParam(value = "End timestamp (exclusive)") @QueryParam("endTimestamp") @DefaultValue("")
+          String endTimestampStr,
+      @ApiParam(value = "Whether to exclude the segments overlapping with the timestamps, false by default")
+      @QueryParam("excludeOverlapping") @DefaultValue("false") boolean excludeOverlapping) {
+    long startTimestamp = Strings.isNullOrEmpty(startTimestampStr) ? Long.MIN_VALUE : Long.parseLong(startTimestampStr);
+    long endTimestamp = Strings.isNullOrEmpty(endTimestampStr) ? Long.MAX_VALUE : Long.parseLong(endTimestampStr);
+    Preconditions.checkArgument(startTimestamp < endTimestamp,
+        "The value of startTimestamp should be smaller than the one of endTimestamp. Start timestamp: %d. End "
+            + "timestamp: %d",
+        startTimestamp, endTimestamp);
+
+    List<String> tableNamesWithType = ResourceUtils
+        .getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, Constants.validateTableType(tableTypeStr),
+            LOGGER);
+    List<Map<TableType, List<String>>> resultList = new ArrayList<>(tableNamesWithType.size());
+    for (String tableNameWithType : tableNamesWithType) {
+      TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      List<String> segments =
+          _pinotHelixResourceManager
+              .getSegmentsForTableWithTimestamps(tableNameWithType, startTimestamp, endTimestamp, excludeOverlapping);
+      resultList.add(Collections.singletonMap(tableType, segments));
+    }
+    return resultList;
+  }
+
   /**
    * This is a helper method to get the metadata for all segments for a given table name.
    * @param tableNameWithType name of the table along with its type
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8686671..d2f8633 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -593,6 +593,78 @@ public class PinotHelixResourceManager {
     return ZKMetadataProvider.getSegments(_propertyStore, tableNameWithType);
   }
 
+  /**
+   * Returns the segments for the given table based on the start and end timestamp.
+   *
+   * @param tableNameWithType  Table name with type suffix
+   * @param startTimestamp  start timestamp in milliseconds (inclusive)
+   * @param endTimestamp  end timestamp in milliseconds (exclusive)
+   * @param excludeOverlapping  whether to exclude the segments overlapping with the timestamps
+   */
+  public List<String> getSegmentsForTableWithTimestamps(String tableNameWithType, long startTimestamp,
+      long endTimestamp, boolean excludeOverlapping) {
+    List<String> selectedSegments = new ArrayList<>();
+    List<String> segmentNames = getSegmentsFor(tableNameWithType);
+    // If no start and end timestamp specified, just select all the segments.
+    if (startTimestamp == Long.MIN_VALUE && endTimestamp == Long.MAX_VALUE) {
+      selectedSegments = segmentNames;
+    } else {
+      String segmentZKMetadataPathPrefix =
+          ZKMetadataProvider.constructPropertyStorePathForResource(tableNameWithType) + "/";
+      List<String> segmentZKMetadataPaths = new ArrayList<>(segmentNames.size());
+      for (String segmentName : segmentNames) {
+        segmentZKMetadataPaths.add(segmentZKMetadataPathPrefix + segmentName);
+      }
+      List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT);
+      for (int i = 0; i < znRecords.size(); i++) {
+        String segmentName = segmentNames.get(i);
+        ZNRecord znRecord = znRecords.get(i);
+        if (isSegmentWithinTimeStamps(segmentName, znRecord, startTimestamp, endTimestamp, excludeOverlapping)) {
+          selectedSegments.add(segmentName);
+        }
+      }
+    }
+    // Fetch the segment lineage metadata, and filter segments based on segment lineage.
+    ZNRecord segmentLineageZNRecord =
+        SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+    SegmentLineage segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+    Set<String> selectedSegmentSet = new HashSet<>(selectedSegments);
+    SegmentLineageUtils.filterSegmentsBasedOnLineageInPlace(selectedSegmentSet, segmentLineage);
+    return new ArrayList<>(selectedSegmentSet);
+  }
+
+  /**
+   * Checks whether the segment is within the time range between the start and end timestamps.
+   * @param segmentName  segment name
+   * @param znRecord  the ZNRecord associated with the segment name
+   * @param startTimestamp  start timestamp
+   * @param endTimestamp  end timestamp
+   * @param excludeOverlapping  whether to exclude the segments overlapping with the timestamps
+   */
+  private boolean isSegmentWithinTimeStamps(String segmentName, ZNRecord znRecord, long startTimestamp,
+      long endTimestamp, boolean excludeOverlapping) {
+    if (znRecord == null) {
+      return false;
+    }
+    long startTimeMsInSegment = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
+    long endTimeMsInSegment = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+    if (startTimeMsInSegment > endTimeMsInSegment) {
+      LOGGER.warn("Invalid start and end time for segment: {}. Start time: {}. End time: {}", segmentName,
+          startTimeMsInSegment, endTimeMsInSegment);
+      return false;
+    }
+    if (startTimestamp <= startTimeMsInSegment && endTimeMsInSegment < endTimestamp) {
+      // The segment is within the start and end time range.
+      return true;
+    } else if (endTimeMsInSegment < startTimestamp || startTimeMsInSegment >= endTimestamp) {
+      // The segment is outside of the start and end time range.
+      return false;
+    }
+    // If the segment happens to overlap with the start and end time range,
+    // check the excludeOverlapping flag to determine whether to include the segment.
+    return !excludeOverlapping;
+  }
+
   @Nullable
   public SegmentZKMetadata getSegmentZKMetadata(String tableNameWithType, String segmentName) {
     return ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org