You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/07/17 16:55:22 UTC

[GitHub] [incubator-pinot] snleee opened a new pull request #5712: Add startBatchUpload, endBatchUpload controller API

snleee opened a new pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712


   These APIs are the building block to achieve atomically replacing
   m segments into n segments. Segment selection algorithm will be the
   next PR.
   
   1. Added startBatchUplad, endBatchUPload controller API
   2. Added unit tests
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662186212


   > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > 
   > > > > 
   > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > 
   > > > 
   > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > 
   > > 
   > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > 
   > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   
   The `startBatchUpload` and `endBatchUpload` is the primitives we want to provide. I'm okay renaming it to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation")


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662756552


   > > > > > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > > > > > 
   > > > > > > > > > > > 
   > > > > > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > > > > > 
   > > > > > > > > > > 
   > > > > > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > > > > > 
   > > > > > > > > > 
   > > > > > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > > > > > 
   > > > > > > > 
   > > > > > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > > > > > 
   > > > > > > 
   > > > > > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > > > > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > > > > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > > > > > e.g.
   > > > > > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > > > > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > > > > > 
   > > > > > 
   > > > > > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > > > > > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > > > > > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > > > > > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   > > > > 
   > > > > 
   > > > > I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   > > > > Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   > > > > Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.
   > > > 
   > > > 
   > > > From what I have followed based on the code as well as the discussions here:
   > > > 
   > > > 1. This PR is just providing a primitive to replace m segments with n segments in a consistent way.
   > > > 2. It has not considered future possible use cases that could be built using this primitive (eg versioning). However, it is clear and complete in its definition, and doesn't expect to be modified for future usage.
   > > > 3. Any future possible usage of these primitives will create functionality that wraps the primitive as opposed to directly modifying the primitive's code itself (e.g., add new `if` blocks in the primitive).
   > > > 
   > > > Based on my understanding above, and the fact that future usage is unclear (even though it is clear that future usage is not allowed to change the primitive itself), I don't see a strong need to tie the primitive with an operation.
   > > 
   > > 
   > > @mayankshriv Thank you for the summary. I also prefer not to tie the primitives with the operation for now because the current primitive is complete by itself and it does not have any merge specific logic. When other operations start to use this primitive and if it becomes inevitable to add the operation specific logics, we can easily add the extra operation query parameter to make the code cleaner as @mcvsubbu suggested.
   > > @mcvsubbu How do you think on this?
   > 
   > That is the exact thing that I am trying to avoid.
   > Let us say, we introduce a "operation" query param later.
   > We will need to code around the fact that "operation == null" means it is segment merge.
   > And then maybe go around deprecating null and adding making it mandatory over time.
   > 
   > Sure, that can be done yes, but is that how we want it to evolve?
   
   What if we end up not modifying anything on this primitive? Then, we will have some extra parameter that's not even used and we will also need to deprecate this parameter. IMO, inefficiency appears to both approaches. 
   
   The current primitive has no logic specific to segment merge so we don't need to tie `null` to segment merge. We just need to add new operation-specific logic. The logic will be something like the following: 
   
   Let's assume that we added versioning.
   ```
   if (operation != null) {
     // operation specific logic
     switch(operation) {
       case VERSIONING:
         performVersioningSpecificLogic();
         ...
     }
   } 
   
   // existing logic
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458407855



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String
+              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), finalSegmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                batchId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        Preconditions.checkArgument(lineageEntry != null,
+            String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId));
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), String.format(
+            "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable));
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",
+              tableNameWithType, batchId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.updateLineageEntry(batchId, newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, batchId = %s)", tableNameWithType, batchId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("endBatchUpload is successfully processed. (tableNameWithType = {}, batchId = {})", tableNameWithType,

Review comment:
       We can easily find the lineage info from zookeeper (and we can also add an API that fetches lineage entry given batchId).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662610355


   > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > 
   > > > > > > > 
   > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > 
   > > > > > > 
   > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > 
   > > > > > 
   > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > 
   > > > > 
   > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > 
   > > > 
   > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > 
   > > 
   > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > e.g.
   > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > 
   > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > 
   > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   
   I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662752035


   > > > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > > > 
   > > > > > > > > > 
   > > > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > > > 
   > > > > > > > 
   > > > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > > > 
   > > > > > > 
   > > > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > > > 
   > > > > > 
   > > > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > > > 
   > > > > 
   > > > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > > > e.g.
   > > > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > > > 
   > > > 
   > > > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > > > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > > > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > > > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   > > 
   > > 
   > > I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   > > Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   > > Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.
   > 
   > From what I have followed based on the code as well as the discussions here:
   > 
   > 1. This PR is just providing a primitive to replace m segments with n segments in a consistent way.
   > 2. It has not considered future possible use cases that could be built using this primitive (eg versioning). However, it is clear and complete in its definition, and doesn't expect to be modified for future usage.
   > 3. Any future possible usage of these primitives will create functionality that wraps the primitive as opposed to directly modifying the primitive's code itself (e.g., add new `if` blocks in the primitive).
   > 
   > Based on my understanding above, and the fact that future usage is unclear (even though it is clear that future usage is not allowed to change the primitive itself), I don't see a strong need to tie the primitive with an operation.
   
   @mayankshriv Thank you for the summary. I also prefer not to tie the primitives with the operation for now because the current primitive is complete by itself and it does not have any merge specific logic. When other operations start to use this primitive and if it becomes inevitable to add the operation specific logics, we can easily add the extra operation query parameter to make the code cleaner as @mcvsubbu suggested.
   
   @mcvsubbu How do you think on this?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458491923



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String

Review comment:
       Do we want to retry for the validation check failure? I thought that we can kill the process if validation fails and retries if zk update fails. How do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kishoreg commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
kishoreg commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662164476


   > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > 
   > > > 
   > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > 
   > > 
   > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > 
   > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   
   If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mayankshriv commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662748149


   > > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > > 
   > > > > > > > 
   > > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > > 
   > > > > > > 
   > > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > > 
   > > > > > 
   > > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > > 
   > > > > 
   > > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > > 
   > > > 
   > > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > > e.g.
   > > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > > 
   > > 
   > > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   > 
   > I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   > Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   > Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.
   
   From what I have followed based on the code as well as the discussions here:
   1. This PR is just providing a primitive to replace m segments with n segments in a consistent way.
   2. It has not considered future possible use cases that could be built using this primitive (eg versioning). However, it is clear and complete in its definition, and doesn't expect to be modified for future usage.
   3. Any future possible usage of these primitives will use create functionality that will wrap the primitive as opposed to directly modifying the primitive's code itself (e.g., add new `if` blocks in the primitive).
   
   Based on my understanding above, I don't see a strong need to tie the primitive with an operation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458506683



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String
+              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), finalSegmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+

Review comment:
       This check is actually covered by the initial validation
   
   ```
       // Check that all the segments from 'segmentsFrom' exist in the table
       Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
       Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom));
   
       // Check that all the segments from 'segmentTo' does not exist in the table.
       Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo));
   ```
   `segmentsForTable` is superset of `segmentFrom`, so checking `Collections.disjoint(segmentsForTable, segmentsTo)` includes the check of `Collection.disjoint(segmentsTo, segmentsFrom)`
   
   I will add the test case for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662145687


   > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > 
   > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   
   @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458412551



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +460,46 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      StartBatchUploadRequest startBatchUploadRequest) {
+    try {
+      String tableNameWithType =
+          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      String batchId = _pinotHelixResourceManager
+          .startBatchUpload(tableNameWithType, startBatchUploadRequest.getSegmentsFrom(),
+              startBatchUploadRequest.getSegmentsTo());
+      return Response.ok(JsonUtils.newObjectNode().put("batchId", batchId)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Path("segments/{tableName}/endBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "End the batch upload", notes = "End the batch upload")
+  public Response endBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Batch Id returned by startBatchUpload API") @QueryParam("batchId") String batchId) {
+    try {
+      String tableNameWithType =
+          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      // Check that the batch id is valid
+      Preconditions.checkNotNull(batchId, "'batchId' cannot be null");

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458407855



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String
+              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), finalSegmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                batchId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        Preconditions.checkArgument(lineageEntry != null,
+            String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId));
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), String.format(
+            "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable));
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",
+              tableNameWithType, batchId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.updateLineageEntry(batchId, newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, batchId = %s)", tableNameWithType, batchId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("endBatchUpload is successfully processed. (tableNameWithType = {}, batchId = {})", tableNameWithType,

Review comment:
       We can easily find the lineage info from zookeeper (and we can also add an API that fetches lineage entry given batchId if we need to fetch lineage info frequently).
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458997156



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,160 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start segment replace phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Batch Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = SegmentLineageUtils.generateLineageEntryId();
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+

Review comment:
       Do you also need to check that any of the segmentsFrom are not in another merge that has been started (but not finished)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457838835



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -70,6 +71,17 @@ public String addLineageEntry(LineageEntry lineageEntry) {
     return lineageId;
   }
 
+  /**
+   * Add lineage entry to the segment lineage metadata with the given lineage entry id
+   * @param lineageEntryId the id for the lineage entry
+   * @param lineageEntry a lineage entry
+   * @return the id for the input lineage entry for the access
+   */
+  public String addLineageEntry(String lineageEntryId, LineageEntry lineageEntry) {

Review comment:
       One reason I added `void addLineageEntry(String lineageEntryId, LineageEntry lineageEntry)` was because there's no easy way to pass `lineageEntryId` outside of the retry policy block. (`lineageEntryId`  is generated within retry function and it needs to be passed back to the top caller)
   
   Due to the above reason, I have removed `String addLineageEntry(LineageEntry lineageEntry)` and kept `void addLineageEntry(String lineageEntryId, LineageEntry lineageEntry)`.
   
   I also added `void updateLineageEntry(String lineageEntryId, LineageEntry lineageEntry) `




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458503322



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));

Review comment:
       The reason why I generate `batchId` outside of retry block is that it's a bit hard to pass some information from retry block to the caller based on the current `RetryPolicy` interface. For now, let's not deal with the id collision. Throwing exception should be fine. Once this part shows up frequently (likely not because it's extremely hard to observe UUID collision), we can deal with this by improving retry policy interface to allow pass information back to the caller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457845659



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {

Review comment:
       changed to `Collections.disjoint`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kishoreg commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
kishoreg commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-663098410


   @snleee are we good to go? can we add some docs on how to use this?
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee merged pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee merged pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662752341


   OK, My last attempt on this:
   
   If this API does not claim to solve a future problem, do not take up the namespace of the command like 'startBatchUpload'. In this case, any other functionality (e.g. versioning, consistent upload of refresh segments, etc.) that may need a batch upload may not be able to use this without lots of ugly code that is hard to maintain when it comes to backward compatibility (see current POST /segments implementation for an example)
   
   Options are to introduce an operation like I suggested, or rename it.
   
   I prefer the extra query argument but will settle for a rename.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457843106



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +459,45 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457845713



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {

Review comment:
       yes, we need to check both. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458427202



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String

Review comment:
       You need to catch the exception in order to do retries. Same for other places that can throw exception

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -24,7 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
+import org.apache.commons.lang.StringUtils;

Review comment:
       Let's use `lang3`
   ```suggestion
   import org.apache.commons.lang3.StringUtils;
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineageUtil.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.lineage;
+
+import java.util.UUID;
+
+
+/**
+ * Util class for Segment Lineage
+ */
+public class SegmentLineageUtil {

Review comment:
       (nit) `SegmentLineageUtils`
   Also add a private constructor

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));

Review comment:
       Don't directly throw exception as it will abort the retry.
   You can keep generating `batchId` until get a new one (in most case there is no collision)
   ```suggestion
           String batchId = SegmentLineageUtils. generateLineageEntryId();
           while (segmentLineage.getLineageEntry(batchId) != null) {
             batchId = SegmentLineageUtils. generateLineageEntryId();
           }
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -108,8 +119,8 @@ public static SegmentLineage fromZNRecord(ZNRecord record) {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
       Preconditions.checkState(value.size() == 4);
-      List<String> segmentsFrom = Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
-      List<String> segmentsTo = Arrays.asList(value.get(1).split(COMMA_SEPARATOR));
+      List<String> segmentsFrom = Arrays.asList(StringUtils.split(value.get(0), ','));

Review comment:
       (nit) Use constant?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();

Review comment:
       Use the `SegmentLineageUtils` and move it into the retry logic so that it handles the case of collision

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String
+              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), finalSegmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+

Review comment:
       `segmentsTo` must not be any of the existing `segmentsFrom` as well

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check

Review comment:
       ```suggestion
           // Check ...
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;

Review comment:
       (nit) I don't think you need to have this extra `finalSegmentsFrom` because `segmentsFrom` is never changed

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartBatchUploadRequest.java
##########
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.Nullable;
+
+
+/**
+ * Request object for startBatchUpload API.
+ *
+ * 1. segmentsFrom : original segments. This field can be empty in case the user tries to upload the original segments
+ *    and wants to achieve the atomic update of multiple segments.
+ * 2. segmentsTo : merged segments.
+ */
+public class StartBatchUploadRequest {
+  private List<String> _segmentsFrom;
+  private List<String> _segmentsTo;
+
+  public StartBatchUploadRequest(@JsonProperty("segmentsFrom") @Nullable List<String> segmentsFrom,
+      @JsonProperty("segmentsTo") List<String> segmentsTo) {
+    _segmentsFrom = (segmentsFrom == null) ? new ArrayList<>() : segmentsFrom;

Review comment:
       (nit)
   ```suggestion
       _segmentsFrom = (segmentsFrom == null) ? Collections.emptyList() : segmentsFrom;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457825742



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/BatchId.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class BatchId {

Review comment:
       I was trying to implement the following as the response:
   
   ```
   POST /startBatchUpload
   
   200
   {
     "batchId": "xxx"
   }
   
   ```
   
   One easy way to achieve this on jersey is to use java object. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang edited a comment on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang edited a comment on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662186212


   > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > 
   > > > > 
   > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > 
   > > > 
   > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > 
   > > 
   > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > 
   > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   
   The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662756622


   > OK, My last attempt on this:
   > 
   > If this API does not claim to solve a future problem, do not take up the namespace of the command like 'startBatchUpload'. In this case, any other functionality (e.g. versioning, consistent upload of refresh segments, etc.) that may need a batch upload may not be able to use this without lots of ugly code that is hard to maintain when it comes to backward compatibility (see current POST /segments implementation for an example)
   > 
   > Options are to introduce an operation like I suggested, or rename it.
   > 
   > I prefer the extra query argument but will settle for a rename.
   
   @mcvsubbu I already changed the name to `startReplaceSegments`, `endReplaceSegments`. How do you think on the name?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv edited a comment on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mayankshriv edited a comment on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662748149


   > > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > > 
   > > > > > > > 
   > > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > > 
   > > > > > > 
   > > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > > 
   > > > > > 
   > > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > > 
   > > > > 
   > > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > > 
   > > > 
   > > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > > e.g.
   > > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > > 
   > > 
   > > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   > 
   > I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   > Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   > Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.
   
   From what I have followed based on the code as well as the discussions here:
   1. This PR is just providing a primitive to replace m segments with n segments in a consistent way.
   2. It has not considered future possible use cases that could be built using this primitive (eg versioning). However, it is clear and complete in its definition, and doesn't expect to be modified for future usage.
   3. Any future possible usage of these primitives will create functionality that wraps the primitive as opposed to directly modifying the primitive's code itself (e.g., add new `if` blocks in the primitive).
   
   Based on my understanding above, I don't see a strong need to tie the primitive with an operation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458488496



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;

Review comment:
       Yes you're right. Previous code needed it because I reassigned the value after some validation but this is no longer needed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662161988


   > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > 
   > > 
   > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > 
   > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > 
   > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > 
   > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   
   This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458491923



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String

Review comment:
       Do we want to retry for the validation check failure? I thought that we can kill the process if validation fails and retries on zk update. How do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662753137


   > > > > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > > > > 
   > > > > > > > > > > 
   > > > > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > > > > 
   > > > > > > > > > 
   > > > > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > > > > 
   > > > > > > > 
   > > > > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > > > > 
   > > > > > > 
   > > > > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > > > > 
   > > > > > 
   > > > > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > > > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > > > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > > > > e.g.
   > > > > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > > > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > > > > 
   > > > > 
   > > > > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > > > > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > > > > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > > > > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   > > > 
   > > > 
   > > > I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   > > > Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   > > > Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.
   > > 
   > > 
   > > From what I have followed based on the code as well as the discussions here:
   > > 
   > > 1. This PR is just providing a primitive to replace m segments with n segments in a consistent way.
   > > 2. It has not considered future possible use cases that could be built using this primitive (eg versioning). However, it is clear and complete in its definition, and doesn't expect to be modified for future usage.
   > > 3. Any future possible usage of these primitives will create functionality that wraps the primitive as opposed to directly modifying the primitive's code itself (e.g., add new `if` blocks in the primitive).
   > > 
   > > Based on my understanding above, and the fact that future usage is unclear (even though it is clear that future usage is not allowed to change the primitive itself), I don't see a strong need to tie the primitive with an operation.
   > 
   > @mayankshriv Thank you for the summary. I also prefer not to tie the primitives with the operation for now because the current primitive is complete by itself and it does not have any merge specific logic. When other operations start to use this primitive and if it becomes inevitable to add the operation specific logics, we can easily add the extra operation query parameter to make the code cleaner as @mcvsubbu suggested.
   > 
   > @mcvsubbu How do you think on this?
   
   That is the exact thing that I am trying to avoid.
   Let us say, we introduce a "operation"  query param later.
   We will need to code around the fact that "operation == null" means it is segment merge.
   And then maybe go around deprecating null and adding making it mandatory over time.
   
   Sure, that can be done yes, but is that how we want it to evolve?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662757591


   I'll take that, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457825742



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/BatchId.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class BatchId {

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458213591



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String
+              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), finalSegmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        Preconditions.checkArgument(segmentLineageZNRecord != null, String
+            .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                batchId));
+        segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+        expectedVersion = segmentLineageZNRecord.getVersion();
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        Preconditions.checkArgument(lineageEntry != null,
+            String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId));
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        Preconditions.checkArgument(segmentsForTable.containsAll(lineageEntry.getSegmentsTo()), String.format(
+            "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable));
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.warn("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",
+              tableNameWithType, batchId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.updateLineageEntry(batchId, newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, batchId = %s)", tableNameWithType, batchId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("endBatchUpload is successfully processed. (tableNameWithType = {}, batchId = {})", tableNameWithType,

Review comment:
       Do you want to include lineage in the log?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +460,46 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      StartBatchUploadRequest startBatchUploadRequest) {
+    try {
+      String tableNameWithType =
+          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      String batchId = _pinotHelixResourceManager
+          .startBatchUpload(tableNameWithType, startBatchUploadRequest.getSegmentsFrom(),
+              startBatchUploadRequest.getSegmentsTo());
+      return Response.ok(JsonUtils.newObjectNode().put("batchId", batchId)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Path("segments/{tableName}/endBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "End the batch upload", notes = "End the batch upload")
+  public Response endBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Batch Id returned by startBatchUpload API") @QueryParam("batchId") String batchId) {
+    try {
+      String tableNameWithType =
+          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      // Check that the batch id is valid
+      Preconditions.checkNotNull(batchId, "'batchId' cannot be null");

Review comment:
       ```suggestion
         Preconditions.checkNotNull(batchId, "'batchId' should not be null");
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +460,46 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr,
+      StartBatchUploadRequest startBatchUploadRequest) {
+    try {
+      String tableNameWithType =
+          TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+      String batchId = _pinotHelixResourceManager
+          .startBatchUpload(tableNameWithType, startBatchUploadRequest.getSegmentsFrom(),
+              startBatchUploadRequest.getSegmentsTo());
+      return Response.ok(JsonUtils.newObjectNode().put("batchId", batchId)).build();

Review comment:
       Will minion reference "batchId"  to do automatic merges? In that case, it is useful to add this to CommonConstants. Even otherwise, if we are building any client software to interact with the controller, it is useful, so I would suggest to move it to common constants.

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String

Review comment:
       Can we move this check to the caller? Or, do you expect multiple callers to this API?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662529706


   > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > 
   > > > > > > 
   > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > 
   > > > > > 
   > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > 
   > > > > 
   > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > 
   > > > 
   > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > 
   > > 
   > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > 
   > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > 
   > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > 
   > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > 
   > e.g.
   > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   
   When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   
   If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457845842



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {

Review comment:
       I have changed all checks to use `Preconditions`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457840296



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +459,45 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)
+      throws IOException {
+    StartBatchUploadRequest request = JsonUtils.stringToObject(body, StartBatchUploadRequest.class);
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+    try {
+      String batchId = _pinotHelixResourceManager
+          .startBatchUpload(tableNameWithType, request.getSegmentsFrom(), request.getSegmentsTo());
+      return Response.ok(new BatchId(batchId)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Path("segments/{tableName}/endBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "End the batch upload", notes = "End the batch upload")
+  public Response endBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)

Review comment:
       fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457842868



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartBatchUploadRequest.java
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+public class StartBatchUploadRequest {
+  private List<String> _segmentsFrom;
+  private List<String> _segmentsTo;
+
+  public StartBatchUploadRequest(@JsonProperty("segmentsFrom") List<String> segmentsFrom,

Review comment:
       added default & null check




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-663335958


   #2715


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662155835


   > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > 
   > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   
   If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   
   But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want. 
   
   that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458408605



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String

Review comment:
       The caller of this function is the API entry point. I put all the validations in the same function. Can you explain a bit more on why you think moving this check to the caller is better?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee edited a comment on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee edited a comment on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662194364


   
   
   > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > 
   > > > > > 
   > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > 
   > > > > 
   > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > 
   > > > 
   > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > 
   > > 
   > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > 
   > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   
   I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   
   The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   
   @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   
   e.g. 
   Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458408605



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String

Review comment:
       I moved this to the constructor of `StartBatchUploadRequest`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-660224442


   To see the more explanation about the 
   https://docs.google.com/document/d/1-AKCfXNXdoNjFIvJ87wjWwFM_38gS0NCwFrIYjYsqp8/edit#
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457825393



##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -108,7 +120,9 @@ public static SegmentLineage fromZNRecord(ZNRecord record) {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
       Preconditions.checkState(value.size() == 4);
-      List<String> segmentsFrom = Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
+      String segmentsFromStr = value.get(0);
+      List<String> segmentsFrom = (segmentsFromStr == null || segmentsFromStr.length() == 0) ? new ArrayList<>()
+          : Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
       List<String> segmentsTo = Arrays.asList(value.get(1).split(COMMA_SEPARATOR));

Review comment:
       changed to `StringUtils.split()`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mayankshriv edited a comment on pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
mayankshriv edited a comment on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662748149


   > > > > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > > > > 
   > > > > > > > > 
   > > > > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > > > > 
   > > > > > > > 
   > > > > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > > > > 
   > > > > > > 
   > > > > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > > > > 
   > > > > > 
   > > > > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > > > > 
   > > > > 
   > > > > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   > > > 
   > > > 
   > > > I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   > > > The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   > > > @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this replacement primitive API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   > > > e.g.
   > > > Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   > > > Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   > > 
   > > 
   > > When you say "likely" that means we dont have a clear idea. The parameters may change in some way.
   > > If you want to look at how complex coding gets when the same API is used for multiple operations, you don't have to look far. Just look at `/segments` It accommodates so many different options in the same API: refresh vs add, uri vs actual segment, encryption at source/sink , encryption scheme by source/tableschema ....
   > > Now imagine if you were to write code for the API and you knew what exactly the intention of the API invoker was right in the beginning. Would the code not be simpler?
   > > My argument is simple. If you are using this for multiple operations, and the invoker knows which operation they are going to perform, let the invoker specify the operation. We then have the flexibility to keep the code simple, or support a generic code that handles all possible operations. as in : if (operation == merge) mergeSegments(); else if (operation == batchUpload()) batchUpload(); else throw IllegalOperation()
   > 
   > I would argue another way around, where API should be easy to use, instead of asking the user to understand all the supported operations in order to use it.
   > Other than that, I don't see the behavior difference for the primitive for merge vs batch-upload because the semantic of the primitive is just replacing the segments, and it does not matter what is the purpose of using the primitive. For an example, when you use a lock, you wouldn't pass in an extra parameter stating that the lock is for preventing deadlock or solving race conditions.
   > Even if we need to add the operation (I don't see that happen), it is always easy to add extra query parameters, but removing them is impossible without backward-incompatible change.
   
   From what I have followed based on the code as well as the discussions here:
   1. This PR is just providing a primitive to replace m segments with n segments in a consistent way.
   2. It has not considered future possible use cases that could be built using this primitive (eg versioning). However, it is clear and complete in its definition, and doesn't expect to be modified for future usage.
   3. Any future possible usage of these primitives will create functionality that wraps the primitive as opposed to directly modifying the primitive's code itself (e.g., add new `if` blocks in the primitive).
   
   Based on my understanding above, and the fact that future usage is unclear (even though it is clear that future usage is not allowed to change the primitive itself), I don't see a strong need to tie the primitive with an operation.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r457843510



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                    + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+                lineageEntry.getSegmentsTo(), segmentsTo);
+            throw new IllegalArgumentException(errorMsg);
+          }
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    // Check that the batch id is valid
+    if (batchId == null || batchId.isEmpty()) {
+      throw new IllegalArgumentException("'batchId' cannot be null or empty");
+    }
+
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          String errorMsg = String
+              .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                  batchId);
+          throw new IllegalArgumentException(errorMsg);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        if (lineageEntry == null) {
+          String errorMsg =
+              String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        if (!segmentsForTable.containsAll(lineageEntry.getSegmentsTo())) {
+          String errorMsg = String.format(
+              "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                  + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.info("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",
+              tableNameWithType, batchId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.addLineageEntry(batchId, newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, batchId = %s)", tableNameWithType, batchId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }

Review comment:
       added




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458506683



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear twice.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), String
+              .format("It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                      + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                  lineageEntry.getSegmentsFrom(), finalSegmentsFrom));
+
+          // Check that merged segments name cannot be the same.
+          Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo), String.format(
+              "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                  + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+              lineageEntry.getSegmentsTo(), segmentsTo));
+        }
+

Review comment:
       This check is actually covered by the initial validation
   
   ```
       // Check that all the segments from 'segmentsFrom' exist in the table
       Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
       Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom));
   
       // Check that all the segments from 'segmentTo' does not exist in the table.
       Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo));
   ```
   `segmentsForTable` is superset of `segmentFrom`, so checking `Collections.disjoint(segmentsForTable, segmentsTo)` includes the check of `Collection.disjoint(segmentsTo, segmentsFrom)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
Jackie-Jiang commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r456725078



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {

Review comment:
       Perform null check within the request?

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -108,7 +120,9 @@ public static SegmentLineage fromZNRecord(ZNRecord record) {
       String lineageId = listField.getKey();
       List<String> value = listField.getValue();
       Preconditions.checkState(value.size() == 4);
-      List<String> segmentsFrom = Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
+      String segmentsFromStr = value.get(0);
+      List<String> segmentsFrom = (segmentsFromStr == null || segmentsFromStr.length() == 0) ? new ArrayList<>()
+          : Arrays.asList(value.get(0).split(COMMA_SEPARATOR));
       List<String> segmentsTo = Arrays.asList(value.get(1).split(COMMA_SEPARATOR));

Review comment:
       Can `segmentsFromStr` be null?
   Also avoid using the regex split
   ```suggestion
         List<String> segmentsFrom = Arrays.asList(StringUtils.split(value.get(0), ','));
         List<String> segmentsTo = Arrays.asList(StringUtils.split(value.get(1), ','));
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/StartBatchUploadRequest.java
##########
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+
+
+public class StartBatchUploadRequest {
+  private List<String> _segmentsFrom;
+  private List<String> _segmentsTo;
+
+  public StartBatchUploadRequest(@JsonProperty("segmentsFrom") List<String> segmentsFrom,

Review comment:
       Put default or check they cannot be null?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +459,45 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)

Review comment:
       You can directly put `StartBatchUploadRequest` as the parameter
   ```suggestion
         @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, StartBatchUploadRequest request)
   ```

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/lineage/SegmentLineage.java
##########
@@ -70,6 +71,17 @@ public String addLineageEntry(LineageEntry lineageEntry) {
     return lineageId;
   }
 
+  /**
+   * Add lineage entry to the segment lineage metadata with the given lineage entry id
+   * @param lineageEntryId the id for the lineage entry
+   * @param lineageEntry a lineage entry
+   * @return the id for the input lineage entry for the access
+   */
+  public String addLineageEntry(String lineageEntryId, LineageEntry lineageEntry) {

Review comment:
       Keep only `String addLineageEntry(LineageEntry lineageEntry)` or `void addLineageEntry(String lineageEntryId, LineageEntry lineageEntry)`, don't keep both because it will be confusing. In other world, always generate UUID inside or outside of this class.
   
   My suggestion is keeping `String addLineageEntry(LineageEntry lineageEntry)`, you can check existence of the `lineageId` (or not because UUID collision change is too low)
   
   In order to update the lineage entry, you can add a method `void updateLineageEntry(String lineageEntryId, LineageEntry lineageEntry)`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {

Review comment:
       For readability
   ```suggestion
             Preconditions.checkArgument(Collections.disjoint(lineageEntry.getSegmentsFrom(), finalSegmentsFrom), ...)
   ```

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {

Review comment:
       Can be simplified with `Preconditions.checkArgument()`, same for other validation checks

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {

Review comment:
       Need to check both `Collections.disjoint(lineageEntry.getSegmentsFrom(), segmentsTo))` and `Collections.disjoint(lineageEntry.getSegmentsTo(), segmentsTo))`

##########
File path: pinot-common/src/main/java/org/apache/pinot/common/restlet/resources/BatchId.java
##########
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.restlet.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+public class BatchId {

Review comment:
       Any reason introducing this object? I feel it will be easier to use if we directly use string batch id instead of json batch id

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {

Review comment:
       Can be simplified with `Preconditions.checkArgument()`

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
##########
@@ -455,6 +459,45 @@ public void uploadSegmentAsMultiPartV2(FormDataMultiPart multiPart,
     }
   }
 
+  @POST
+  @Path("segments/{tableName}/startBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Start the batch upload", notes = "Start the batch upload")
+  public Response startBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)
+      throws IOException {
+    StartBatchUploadRequest request = JsonUtils.stringToObject(body, StartBatchUploadRequest.class);
+    String tableNameWithType =
+        TableNameBuilder.forType(TableType.valueOf(tableTypeStr.toUpperCase())).tableNameWithType(tableName);
+    try {
+      String batchId = _pinotHelixResourceManager
+          .startBatchUpload(tableNameWithType, request.getSegmentsFrom(), request.getSegmentsTo());
+      return Response.ok(new BatchId(batchId)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
+  @POST
+  @Path("segments/{tableName}/endBatchUpload")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "End the batch upload", notes = "End the batch upload")
+  public Response endBatchUpload(
+      @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, String body)

Review comment:
       Send batchId as query param?

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                    + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+                lineageEntry.getSegmentsTo(), segmentsTo);
+            throw new IllegalArgumentException(errorMsg);
+          }
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    // Check that the batch id is valid
+    if (batchId == null || batchId.isEmpty()) {
+      throw new IllegalArgumentException("'batchId' cannot be null or empty");
+    }
+
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          String errorMsg = String
+              .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                  batchId);
+          throw new IllegalArgumentException(errorMsg);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        if (lineageEntry == null) {
+          String errorMsg =
+              String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        if (!segmentsForTable.containsAll(lineageEntry.getSegmentsTo())) {
+          String errorMsg = String.format(
+              "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                  + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.info("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",

Review comment:
       Probably WARN? This is unexpected. Also, move this in front of the segments check

##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,190 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    if (segmentsTo == null || segmentsTo.isEmpty()) {
+      String errorMsg = String
+          .format("'segmentsTo' cannot be null or empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    // segmentsFrom can be empty in case of the initial upload
+    if (segmentsFrom == null) {
+      segmentsFrom = new ArrayList<>();
+    }
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    if (!segmentsForTable.containsAll(segmentsFrom)) {
+      String errorMsg = String.format(
+          "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+              + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+          segmentsForTable);
+      throw new IllegalArgumentException(errorMsg);
+    }
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        if (segmentLineage.getLineageEntry(batchId) != null) {
+          String errorMsg = String.format("BatchId (%s) already exists in the segment lineage.", batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        for (String lineageEntryId : segmentLineage.getLineageEntryIds()) {
+          LineageEntry lineageEntry = segmentLineage.getLineageEntry(lineageEntryId);
+
+          // Check that any segment from 'segmentsFrom' does not appear on the left side.
+          if (lineageEntry.getSegmentsFrom().stream().anyMatch(finalSegmentsFrom::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to merge segments that are already merged. (tableName = %s, segmentsFrom from "
+                    + "existing lineage entry = %s, requested segmentsFrom = %s)", tableNameWithType,
+                lineageEntry.getSegmentsFrom(), finalSegmentsFrom);
+            throw new IllegalArgumentException(errorMsg);
+          }
+
+          // Check that merged segments name cannot be the same for different lineage entry
+          if (lineageEntry.getSegmentsTo().stream().anyMatch(segmentsTo::contains)) {
+            String errorMsg = String.format(
+                "It is not allowed to have the same segment name for merged segments. (tableName = %s, segmentsTo from "
+                    + "existing lineage entry = %s, requested segmentsTo = %s)", tableNameWithType,
+                lineageEntry.getSegmentsTo(), segmentsTo);
+            throw new IllegalArgumentException(errorMsg);
+          }
+        }
+
+        // Update lineage entry
+        segmentLineage.addLineageEntry(batchId,
+            new LineageEntry(finalSegmentsFrom, segmentsTo, LineageEntryState.IN_PROGRESS, System.currentTimeMillis()));
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed while updating the segment lineage. (tableName = %s, segmentsFrom = %s, segmentsTo = %s)",
+              tableNameWithType, segmentsFrom, segmentsTo);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }
+
+    // Only successful attempt can reach here
+    LOGGER.info("startBatchUpload is successfully processed. (tableNameWithType = {}, segmentsFrom = {}, "
+        + "segmentsTo = {}, batchId = {})", tableNameWithType, segmentsFrom, segmentsTo, batchId);
+    return batchId;
+  }
+
+  /**
+   * Computes the end batch upload phase
+   *
+   * 1. Compute validation
+   * 2. Update the lineage entry state to "COMPLETED" and write metadata to the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType
+   * @param batchId
+   */
+  public void endBatchUpload(String tableNameWithType, String batchId) {
+    // Check that the batch id is valid
+    if (batchId == null || batchId.isEmpty()) {
+      throw new IllegalArgumentException("'batchId' cannot be null or empty");
+    }
+
+    try {
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          String errorMsg = String
+              .format("Segment lineage does not exist. (tableNameWithType = '%s', batchId = '%s')", tableNameWithType,
+                  batchId);
+          throw new IllegalArgumentException(errorMsg);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Look up the lineage entry based on the batch id
+        LineageEntry lineageEntry = segmentLineage.getLineageEntry(batchId);
+        if (lineageEntry == null) {
+          String errorMsg =
+              String.format("Invalid batch id (tableName='%s', batchId='%s')", tableNameWithType, batchId);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // Check that all the segments from 'segmentsTo' exist in the table
+        Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+        if (!segmentsForTable.containsAll(lineageEntry.getSegmentsTo())) {
+          String errorMsg = String.format(
+              "Not all segments from 'segmentsTo' are available in the table. (tableName = '%s', segmentsTo = '%s', "
+                  + "segmentsFromTable = '%s')", tableNameWithType, lineageEntry.getSegmentsTo(), segmentsForTable);
+          throw new IllegalArgumentException(errorMsg);
+        }
+
+        // NO-OPS if the entry is already completed
+        if (lineageEntry.getState() == LineageEntryState.COMPLETED) {
+          LOGGER.info("Lineage entry state is already COMPLETED. Nothing to update. (tableNameWithType={}, batchId={})",
+              tableNameWithType, batchId);
+          return true;
+        }
+
+        // Update lineage entry
+        LineageEntry newLineageEntry =
+            new LineageEntry(lineageEntry.getSegmentsFrom(), lineageEntry.getSegmentsTo(), LineageEntryState.COMPLETED,
+                System.currentTimeMillis());
+        segmentLineage.addLineageEntry(batchId, newLineageEntry);
+
+        // Write back to the lineage entry
+        return SegmentLineageAccessHelper.writeSegmentLineage(_propertyStore, segmentLineage, expectedVersion);
+      });
+    } catch (Exception e) {
+      String errorMsg = String
+          .format("Failed to update the segment lineage. (tableName = %s, batchId = %s)", tableNameWithType, batchId);
+      LOGGER.error(errorMsg, e);
+      throw new RuntimeException(errorMsg, e);
+    }

Review comment:
       Log something after it succeeds?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-662194364


   
   
   
   > > > > > > I think it is prudent to add an extra query parameter to the controller API for start/end batchupload. I suggest "operation" and make it string type. In this case, "Operation" will be set to "MERGE"
   > > > > > > This way, we don't assume that if source segments is null, the the operation MUST be new segment upload. Let us specify the operation clearly, and then it gets easier to evolve the API over other operations we may need.
   > > > > > 
   > > > > > 
   > > > > > @mcvsubbu We want to build primitive operations and we can use them to achieve multiple purpose such as batch upload, batch replace and version control. I don't think we should associate this primitive operation with any specific operation such as `MERGE` or `NEW_UPLOAD`. The semantic of this primitive operation is quite clear: replace the segments in `segmentsFrom` with segments in `segmentsTo` (we might want to rename the parameters if that makes the semantic more clear). @snleee Thoughts?
   > > > > 
   > > > > 
   > > > > If we know that this API will never be used for anything other than merge, then it is ok as defined. In that case, I would rename the API to say startBatchForMerge() or something like that. That way, if we want to introduce a startBatch for upload, we can do so.
   > > > > But then we know that this API will perhaps be re-used for batch upload of segments. Since we are reasonably sure that will be the case, but we don't know exactly what arguments it will take when we re-use it for another operation, it is best to specify the operation name we want.
   > > > > that way, even if some other set of arguments need to be added, or some null assimptions do not hold true, we have the operation very clearly specified. Yes, we can always add operaton later on, and say that if "operation" is not present, then treat it as merge, but I think it is clearner to specify that now.
   > > > 
   > > > 
   > > > This API will be used for merge, batch upload, batch replace, but I don't want to associate the API with certain type of operation because the semantic of this API is very clear and is independent of the actual operation. It will replace the segments in `segmentsFrom` with segments in `segmentsTo` atomically, and this will hold for all operations.
   > > 
   > > 
   > > If thats the case, then I agree with Subbu. It's better to name this primitive as ReplaceSegments or SwapSegments. Then all higher-level API can leverage this. WDYT?
   > 
   > The `startBatchUpload` and `endBatchUpload` are the primitives we want to provide. I'm okay renaming them to something like `startReplacingSegments` and `endReplacingSegments`, but I don't want to make the primitives associate with any specific operation (taking an extra query param of "operation") because the behavior should be the same for all operations
   
   I think that the confusion may come from the naming, `batchUpload`. There was a different design for batch upload protocol a while ago that was aiming to reduce the data inconsistency during the segment push (this design was about using batchId for upload, keep all segments under batchId, and update metadata at once in the end)
   
   The current design is much simpler. This API is essentially a primitive that replaces m segments with n segments atomically. I like the `startReplaceSegments`/ `endReplaceSegments` suggestion because it directly indicates what the API is trying to do.
   
   @mcvsubbu I put some thoughts on this and it's likely that we won't change parameters for these APIs when we implement other operations (versioning/batch replacement/merge). Rather, those operations will use this API as a part of the step. In that sense, we don't need to associate higher level operation to the lower level primitive.
   
   e.g. 
   Segment Merge -> minion will call these APIs when replacing old segments with merged segments.
   Batch Upload/Batch Replacement(backfill) -> Pinot build and push job will call these APIs to upload/backfill segments
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee edited a comment on pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee edited a comment on pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#issuecomment-660224442


   To see the more explanation about the batch upload protocol:
   https://docs.google.com/document/d/1-AKCfXNXdoNjFIvJ87wjWwFM_38gS0NCwFrIYjYsqp8/edit#
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startReplaceSegments, endReplaceSegments controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r459142816



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,160 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start segment replace phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Batch Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startReplaceSegments(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = SegmentLineageUtils.generateLineageEntryId();
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+

Review comment:
       Good point. It's very important that for any segment, it should appear on `segmentsFrom` at most once because we will have the issue with double-counting otherwise. In order to prevent that, we will do the following:
    
   1. Merge task scheduler will check the segments that are running (started but not finished) not to schedule the segments that are already scheduled.
   2. During the "startReplaceSegments", we reject if the input `segmentFrom` appears any of `segmentFrom` field in the segment lineage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] snleee commented on a change in pull request #5712: Add startBatchUpload, endBatchUpload controller API

Posted by GitBox <gi...@apache.org>.
snleee commented on a change in pull request #5712:
URL: https://github.com/apache/incubator-pinot/pull/5712#discussion_r458492062



##########
File path: pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
##########
@@ -2164,6 +2168,167 @@ public boolean instanceExists(String instanceName) {
     return tableNamesWithType;
   }
 
+  /**
+   * Computes the start batch upload phase
+   *
+   * 1. Generate a batch id
+   * 2. Compute validation on the user inputs
+   * 3. Add the new lineage entry to the segment lineage metadata in the property store
+   *
+   * Update is done with retry logic along with read-modify-write block for achieving atomic update of the lineage
+   * metadata.
+   *
+   * @param tableNameWithType Table name with type
+   * @param segmentsFrom a list of segments to be merged
+   * @param segmentsTo a list of merged segments
+   * @return Bath Id
+   *
+   * @throws InvalidConfigException
+   */
+  public String startBatchUpload(String tableNameWithType, List<String> segmentsFrom, List<String> segmentsTo) {
+    // Create a batch id
+    String batchId = UUID.randomUUID().toString();
+
+    // Check that segmentsTo is not empty.
+    Preconditions.checkArgument(!segmentsTo.isEmpty(), String
+        .format("'segmentsTo' cannot be empty (tableName = '%s', segmentsFrom = '%s', segmentsTo = '%s'",
+            tableNameWithType, segmentsFrom, segmentsTo));
+
+    // Check that all the segments from 'segmentsFrom' exist in the table
+    Set<String> segmentsForTable = new HashSet<>(getSegmentsFor(tableNameWithType));
+    Preconditions.checkArgument(segmentsForTable.containsAll(segmentsFrom), String.format(
+        "Not all segments from 'segmentsFrom' are available in the table. (tableName = '%s', segmentsFrom = '%s', "
+            + "segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom, segmentsTo,
+        segmentsForTable));
+
+    // Check that all the segments from 'segmentTo' does not exist in the table.
+    Preconditions.checkArgument(Collections.disjoint(segmentsForTable, segmentsTo), String.format(
+        "Any segments from 'segmentsTo' should not be available in the table at this point. (tableName = '%s', "
+            + "segmentsFrom = '%s', segmentsTo = '%s', segmentsFromTable = '%s')", tableNameWithType, segmentsFrom,
+        segmentsTo, segmentsForTable));
+
+    try {
+      final List<String> finalSegmentsFrom = segmentsFrom;
+      DEFAULT_RETRY_POLICY.attempt(() -> {
+        // Fetch the segment lineage metadata
+        ZNRecord segmentLineageZNRecord =
+            SegmentLineageAccessHelper.getSegmentLineageZNRecord(_propertyStore, tableNameWithType);
+        SegmentLineage segmentLineage;
+        int expectedVersion = -1;
+        if (segmentLineageZNRecord == null) {
+          segmentLineage = new SegmentLineage(tableNameWithType);
+        } else {
+          segmentLineage = SegmentLineage.fromZNRecord(segmentLineageZNRecord);
+          expectedVersion = segmentLineageZNRecord.getVersion();
+        }
+
+        // Check that the batchId doesn't exists in the segment lineage
+        Preconditions.checkArgument(segmentLineage.getLineageEntry(batchId) == null,
+            String.format("BatchId (%s) already exists in the segment lineage.", batchId));
+
+        // Check

Review comment:
       removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org