You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/18 00:54:01 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Jackie-Jiang commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r456725078



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {

Review comment:
       Perform null check within the request?

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -108,7 +120,9 @@ public static SegmentLineage fromZNRecord(ZNRecord record) {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
       Preconditions.checkState(value.size() == 4);
-      List<String> segmentsFrom = Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
+      String segmentsFromStr = value.get(0);
+      List<String> segmentsFrom = (segmentsFromStr == null || segmentsFromStr.length() == 0) ? new ArrayList<>()
+          : Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
       List<String> segmentsTo = Arrays.asList(value.get(1).split(COMMA_SEPARATOR));

Review comment:
       Can `segmentsFromStr` be null?
   Also avoid using the regex split
   ```suggestion
         List<String> segmentsFrom = Arrays.asList(StringUtils.split(value.get(0), ','));
         List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), ','));
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartBatchUploadRequest.java
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+public class StartBatchUploadRequest {
+  private List<String> _segmentsFrom;
+  private List<String> _segmentsTo;
+
+  public StartBatchUploadRequest(@JsonProperty("segmentsFrom") List<String> segmentsFrom,

Review comment:
       Put default or check they cannot be null?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +459,45 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)

Review comment:
       You can directly put `StartBatchUploadRequest` as the parameter
   ```suggestion
         @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, StartBatchUploadRequest request)
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -70,6 +71,17 @@ public String addLineageEntry(LineageEntry lineageEntry) {
     return lineageId;
   }
 
+  /**
+   * Add lineage entry to the segment lineage metadata with the given lineage entry id
+   * @param lineageEntryId the id for the lineage entry
+   * @param lineageEntry a lineage entry
+   * @return the id for the input lineage entry for the access
+   */
+  public String addLineageEntry(String lineageEntryId, LineageEntry lineageEntry) {

Review comment:
       Keep only `String addLineageEntry(LineageEntry lineageEntry)` or `void addLineageEntry(String lineageEntryId, LineageEntry lineageEntry)`, don't keep both because it will be confusing. In other world, always generate UUID inside or outside of this class.
   
   My suggestion is keeping `String addLineageEntry(LineageEntry lineageEntry)`, you can check existence of the `lineageId` (or not because UUID collision change is too low)
   
   In order to update the lineage entry, you can add a method `void updateLineageEntry(String lineageEntryId, LineageEntry lineageEntry)`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {

Review comment:
       For readability
   ```suggestion
             Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), ...)
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {

Review comment:
       Can be simplified with `Preconditions.checkArgument()`, same for other validation checks

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {

Review comment:
       Need to check both `Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsTo))` and `Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo))`

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/BatchId.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class BatchId {

Review comment:
       Any reason introducing this object? I feel it will be easier to use if we directly use string batch id instead of json batch id

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {

Review comment:
       Can be simplified with `Preconditions.checkArgument()`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +459,45 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)
+      throws IOException {
+    StartBatchUploadRequest request = JsonUtils.stringToObject(body, StartBatchUploadRequest.class);
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+    try {
+      String batchId = _pinotHelixResourceManager
+          .startBatchUpload(tableNameWithType, request.getSegmentsFrom(), request.getSegmentsTo());
+      return Response.ok(new BatchId(batchId)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Path("segments/{tableName}/endBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "End the batch upload", notes = "End the batch upload")
+  public Response endBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)

Review comment:
       Send batchId as query param?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                    + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+                lineageEntry.getSegmentsTo(), segmentsTo);
+            throw new IllegalArgumentException(errorMsg);
+          }
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    // Check that the batch id is valid
+    if (batchId == null || batchId.isEmpty()) {
+      throw new IllegalArgumentException("'batchId' cannot be null or empty");
+    }
+
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          String errorMsg = String
+              .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                  batchId);
+          throw new IllegalArgumentException(errorMsg);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        if (lineageEntry == null) {
+          String errorMsg =
+              String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        if (!segmentsForTable.containsAll(lineageEntry.getSegmentsTo())) {
+          String errorMsg = String.format(
+              "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                  + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.info("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",

Review comment:
       Probably WARN? This is unexpected. Also, move this in front of the segments check

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                    + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+                lineageEntry.getSegmentsTo(), segmentsTo);
+            throw new IllegalArgumentException(errorMsg);
+          }
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    // Check that the batch id is valid
+    if (batchId == null || batchId.isEmpty()) {
+      throw new IllegalArgumentException("'batchId' cannot be null or empty");
+    }
+
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          String errorMsg = String
+              .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                  batchId);
+          throw new IllegalArgumentException(errorMsg);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        if (lineageEntry == null) {
+          String errorMsg =
+              String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        if (!segmentsForTable.containsAll(lineageEntry.getSegmentsTo())) {
+          String errorMsg = String.format(
+              "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                  + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.info("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",
+              tableNameWithType, batchId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.addLineageEntry(batchId, newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, batchId = %s)", tableNameWithType, batchId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }

Review comment:
       Log something after it succeeds?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org