You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/24 23:09:41 UTC

[GitHub] [incubator-pinot] amarnathkarthik opened a new pull request #6382: Compatibility test for segment operations upload and delete

amarnathkarthik opened a new pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382


   ## Description
   Implemented compatibility test for Segment UPLOAD and DELETE operations.
   1. **_UPLOAD_**: Operation requires the below input and performs segment generation, compress to tar.gz file and then upload to controller. Also, does the validation check for segment uploaded to the controller and whether it is in ONLINE state.
   > - inputDataFileName
   > - schemaFileName
   > - tableConfigFileName
   > - recordReaderConfigFileName
   > - segmentName
   2. **_DELETE_**: Operation requires the below input and deletes segment. Also, does the validation check for the segment deletion.
   > - tableConfigFileName
   > - segmentName
   
   **_Note:_**
   - Segment UPLOAD and DELETE operation required TableOp to complete creating Schema and Table before calling SegmentOp.
   
   **Test Case:**
   1. UPLOAD - Tested Segment Generation, Compression to tar.gz file, Upload segment, validate upload segment successful, and cleanup of temp directories.
   2. DELETE - Tested Segment deletion and validation to check segment deleted is successful.
   
   
   Issue #4854 
   
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   If you have tagged this as either backward-incompat or release-notes,
   you MUST add text here that you would like to see appear in release notes of the
   next release.
   
   If you have a series of commits adding or enabling a feature, then
   add this section only in final commit that marks the feature completed.
   Refer to earlier release notes to see examples of text
   
   ## Documentation
   If you have introduced a new feature or configuration, please add it to the documentation as well.
   See https://docs.pinot.apache.org/developers/developers-and-contributors/update-document
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549048775



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {

Review comment:
       Same as above will move to a separate method but need clarification on the state. Call to external view returns 2 table views which are OFFLINE and REALTIME, is there a specific state for each of the table view to consider it to ONLINE?
   Can OFFLINE or REALTIME table view have any of these states `ONLINE, CONSUMING`, and is it considered as ONLINE?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548765615



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();

Review comment:
       Both should be the same since the segment name is passed when generating the segment but above one should give the actual segment name used when generating. In the case of segment naming logic change to use either sequence id or min/max value, this implementation would always return the correct segment name.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-751130863


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=h1) Report
   > Merging [#6382](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=desc) (bb5c23f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.09%`.
   > The diff coverage is `56.80%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6382/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6382      +/-   ##
   ==========================================
   - Coverage   66.44%   65.35%   -1.10%     
   ==========================================
     Files        1075     1305     +230     
     Lines       54773    63104    +8331     
     Branches     8168     9164     +996     
   ==========================================
   + Hits        36396    41241    +4845     
   - Misses      15700    18926    +3226     
   - Partials     2677     2937     +260     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `65.35% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <0.00%> (+9.52%)` | :arrow_up: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1159 more](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=footer). Last update [6d8b09e...bb5c23f](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548766320



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       In case, if killed mid-way, the next execution will create a new file since the file name is appended with UUID, therefore it should not impact the test. IMO, the general convention when using `FileUtils.getTempDirectory()` would be that file/directory will be deleted on exit, hence I don't it required and also file name with prefix `pinot-compat-test` should provide the context in the case file are listed using OS file system.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548778157



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       Fixed.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551516583



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v -> v.getValue().values().contains(state))

Review comment:
       Considering the below sample table's external view, where replicas are 2, the expectation is both `Server_hostname1_8001` and `Server_hostname2_8001` should in `ONLINE` else if its should be derived segment is `OFFLINE`?
   
   **Sample Table External view:**
   ```{
     "id" : "table_OFFLINE",
     "simpleFields" : {
       "BUCKET_SIZE" : "0",
       "INSTANCE_GROUP_TAG" : "table_OFFLINE",
       "MAX_PARTITIONS_PER_INSTANCE" : "1",
       "NUM_PARTITIONS" : "1",
       "REBALANCE_MODE" : "CUSTOMIZED",
       "REPLICAS" : "2"
     },
     "mapFields" : {
       "account_summary_additive_daily_0" : {
         "Server_hostname1_8001" : "ONLINE",
         "Server_hostname2_8001" : "ONLINE"
       }
     },
     "listFields" : {
     }
   }```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551509121



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)

Review comment:
       Realtime segments are not pushed in this manner. The are created by Pinot and pushed into deep store.
   You have two choices:
   (1) Pick up the table config and check that the table type is OFFLINE. If not, fail the operation. If it is OFFLINE, then get the offline externalview. This is the more rigorous way of doing it.
   (2) Assume that the operation is specified correctly, and just check the offline side.
   
   When we start adding realtime events and segments get generated, we will move this code to wherever it fits so that we can re-use the method to check for realtime segments being ONLINE or CONSUMING state.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549047843



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {
+        if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+          LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+              DEFAULT_MAX_SLEEP_TIME_MS);
+          return false;
+        }
+        LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_WAIT_TIME_MS);
+        Thread.sleep(DEFAULT_WAIT_TIME_MS);
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() > 0) {

Review comment:
       do agree for better code readability moving the verification code to a separate method makes sense, but did not quite understand `We cannot assume that this is the last segment to be deleted`. `SegmentOp` implemented based on your design that it will be called multiple times during upgrade/downgrade, but whenever it's called it will be for 1 segment. Let me know if my understanding is not correct.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549156061



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final int DEFAULT_MAX_SLEEP_TIME_MS = 30000;
+  private static final int DEFAULT_WAIT_TIME_MS = 5000;

Review comment:
       ```suggestion
     private static final int DEFAULT_SLEEP_INTERVAL_MS = 200;
   ```

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {
+        if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+          LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+              DEFAULT_MAX_SLEEP_TIME_MS);
+          return false;
+        }
+        LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_WAIT_TIME_MS);
+        Thread.sleep(DEFAULT_WAIT_TIME_MS);
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() > 0) {

Review comment:
       Valid question.
   So, assuming that we create a table and add a segment each between phases of upgrades (there are six) and delete some of them in some phases. So, we will start with 0 segments, and maybe end with 2 or 3, while going up to 6.
   
   In this case, a count of online segments can be anythning.
   
   So,  all we want to make sure is that the given segment (segment name) is in the state we want it to be. If deleted, then we want to make sure that it disappeared from externalview.
   
   A segment is deleted when it goes away from externalview. If it is present, it better not be in ERROR state (unless we intend that). For now, let us just implement plain old ADD and DELETE operations and check for ONLINE state in the case  of adding a segment, and for not being there in the case of a delete.
   
   So, the best way seems to be to get the externalview, parse it into a json object, and look for specific fields.
   
   And while we are there, might as well account for all replicas being in the same state rather than just one replica. If we add test cases involving replicas this will be one less thing to take care of.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549163392



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {

Review comment:
       Fixed as slated above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551663395



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,219 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
+    }
+    return true;
+  }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
     }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long segmentCount;
+    while ((segmentCount = getSegmentCountInState(state)) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      } else if (segmentCount == -1) {
+        LOGGER.error("Upload segment verification failed, one or more segment(s) is in {} state.",
+            CommonConstants.Helix.StateModel.SegmentStateModel.ERROR);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
     return true;
   }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for OFFLINE which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return -1 in case of ERROR, 1 if all matches the state else 0.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    final Set<String> segmentState =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equals(_segmentName)).flatMap(x -> x.getValue().values().stream())
+            .collect(Collectors.toSet()) : Collections.emptySet();
+
+    if (segmentState.contains(CommonConstants.Helix.StateModel.SegmentStateModel.ERROR)) {
+      return -1;
+    }
+
+    return segmentState.stream().allMatch(x -> x.contains(state)) ? 1 : 0;

Review comment:
       My bad. Yes, you need an integer return, since we are also handling ERROR.
   Thanks,




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548766320



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       In case, if killed mid-way, the next execution will create a new file since the file name is appended with UUID, therefore it should not impact the test. IMO, the general convention when using `FileUtils.getTempDirectory()` would be that file/directory will be deleted on exists, hence I don't it required and also file name with prefix `pinot-compat-test` should provide the context in the case file are listed using OS file system.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548769083



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       I want to avoid the case where an overloaded system has intermittent failures. Let us code it so that we are covered for a reasonably high delay, but will work fast if it is done sooner.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548769018



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();

Review comment:
       If the segment name is not the same as what we provided, there is a bug, and you can return false.  We will use this same segment name later on the delete command, and it better work (instead of being some other segment name :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551506521



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";

Review comment:
       Sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549163369



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {
+        if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+          LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+              DEFAULT_MAX_SLEEP_TIME_MS);
+          return false;
+        }
+        LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_WAIT_TIME_MS);
+        Thread.sleep(DEFAULT_WAIT_TIME_MS);
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() > 0) {

Review comment:
       Thanks for the clarification, as suggested implement a method to get an external view if the tables and 2 other methods (1) to get the count for segment name in a specific state, and (2) to get the count of the segment name irrespective of the state (for delete verification)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551506342



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)

Review comment:
       Implementation checks for the segment in external view in both offline and realtime. Any specific reason why REALTIME should not be considered for this compatibility test? Aren't we planning to ingest the data from Kafka for these use cases?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551649765



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,221 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
+    }
+    return true;
+  }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
     }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long segmentCount;
+    while ((segmentCount = getSegmentCountInState(state)) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      } else if (segmentCount == -1) {
+        LOGGER.error("Upload segment verification failed, one or more segment(s) is in {} state.",
+            CommonConstants.Helix.StateModel.SegmentStateModel.ERROR);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
     return true;
   }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for OFFLINE which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return -1 in case of ERROR, 0 if in OFFLINE state else return count of state matching parameter.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    final Set<String> segmentState =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equals(_segmentName)).flatMap(x -> x.getValue().values().stream())
+            .collect(Collectors.toSet()) : Collections.emptySet();
+
+    if (segmentState.contains(CommonConstants.Helix.StateModel.SegmentStateModel.ERROR)) {

Review comment:
       Thanks for the clarification. Made changes to return -1 in case of `ERROR`, 1 if all matches the desired state and 0 otherwise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551621904



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";

Review comment:
       Fixed

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551504105



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final int DEFAULT_MAX_SLEEP_TIME_MS = 30000;
+  private static final int DEFAULT_SLEEP_INTERVAL_MS = 200;
+
   public enum Op {
-    UPLOAD,
-    DELETE
+    UPLOAD, DELETE

Review comment:
       Intellij pinot default code formatter moves to a single line may have to consider making changes to codestyle-intellij.xml to take care of this issue. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-751130863


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=h1) Report
   > Merging [#6382](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=desc) (ab67510) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.08%`.
   > The diff coverage is `56.80%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6382/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6382      +/-   ##
   ==========================================
   - Coverage   66.44%   65.36%   -1.09%     
   ==========================================
     Files        1075     1305     +230     
     Lines       54773    63104    +8331     
     Branches     8168     9164     +996     
   ==========================================
   + Hits        36396    41249    +4853     
   - Misses      15700    18924    +3224     
   - Partials     2677     2931     +254     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `65.36% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <0.00%> (+9.52%)` | :arrow_up: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1159 more](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=footer). Last update [6d8b09e...ab67510](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-751130863


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=h1) Report
   > Merging [#6382](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=desc) (bb5c23f) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **increase** coverage by `7.37%`.
   > The diff coverage is `73.06%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6382/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6382      +/-   ##
   ==========================================
   + Coverage   66.44%   73.82%   +7.37%     
   ==========================================
     Files        1075     1305     +230     
     Lines       54773    63104    +8331     
     Branches     8168     9164     +996     
   ==========================================
   + Hits        36396    46584   +10188     
   + Misses      15700    13493    -2207     
   - Partials     2677     3027     +350     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `44.81% <38.60%> (?)` | |
   | unittests | `65.35% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `44.44% <0.00%> (-4.40%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `78.57% <ø> (+5.40%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../common/config/tuner/TableConfigTunerRegistry.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1RhYmxlQ29uZmlnVHVuZXJSZWdpc3RyeS5qYXZh) | `72.00% <ø> (ø)` | |
   | [.../apache/pinot/common/exception/QueryException.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZXhjZXB0aW9uL1F1ZXJ5RXhjZXB0aW9uLmphdmE=) | `90.27% <ø> (+5.55%)` | :arrow_up: |
   | [...pinot/common/function/AggregationFunctionType.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vZnVuY3Rpb24vQWdncmVnYXRpb25GdW5jdGlvblR5cGUuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1104 more](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=footer). Last update [6d8b09e...bb5c23f](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551629011



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,221 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
+    }
+    return true;
+  }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
     }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long segmentCount;
+    while ((segmentCount = getSegmentCountInState(state)) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      } else if (segmentCount == -1) {
+        LOGGER.error("Upload segment verification failed, one or more segment(s) is in {} state.",
+            CommonConstants.Helix.StateModel.SegmentStateModel.ERROR);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
     return true;
   }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for OFFLINE which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return -1 in case of ERROR, 0 if in OFFLINE state else return count of state matching parameter.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    final Set<String> segmentState =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equals(_segmentName)).flatMap(x -> x.getValue().values().stream())
+            .collect(Collectors.toSet()) : Collections.emptySet();
+
+    if (segmentState.contains(CommonConstants.Helix.StateModel.SegmentStateModel.ERROR)) {

Review comment:
       Isnt it better to extract the specific segment instance state from the externalview (helix APIs) and explicitly check for specific state we want? Then we can apply this method to any state we like -- CONSUMING or ONLINE or OFFLINE etc.
   
   So,
   1. read externalview into json
   2. Extract the key `mapFields` (a map)
   3. Extract the key `_segmentName` (another map)
   4. Check all values to be the same as desired state
   
   This way, we can re-use the method to check for any state we like (which we will in the future).
   
   thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-751130863


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=h1) Report
   > Merging [#6382](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=desc) (d61e79d) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `21.58%`.
   > The diff coverage is `38.60%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6382/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #6382       +/-   ##
   ===========================================
   - Coverage   66.44%   44.86%   -21.59%     
   ===========================================
     Files        1075     1305      +230     
     Lines       54773    63104     +8331     
     Branches     8168     9164      +996     
   ===========================================
   - Hits        36396    28312     -8084     
   - Misses      15700    32450    +16750     
   + Partials     2677     2342      -335     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `44.86% <38.60%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...org/apache/pinot/broker/queryquota/HitCounter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9IaXRDb3VudGVyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/broker/queryquota/MaxHitRateTracker.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9NYXhIaXRSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/broker/queryquota/QueryQuotaEntity.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9RdWVyeVF1b3RhRW50aXR5LmphdmE=) | `0.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/TimeSegmentPruner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL1RpbWVTZWdtZW50UHJ1bmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/interval/Interval.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/routing/segmentpruner/interval/IntervalTree.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsVHJlZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [1305 more](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=footer). Last update [6d8b09e...d61e79d](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548765663



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();
+    File indexDir = new File(outputDir, segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       added scheduler delay in `getTableExternalView()` method. For now, added 5 seconds which I think would be good enough for the compatibility test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551492153



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";

Review comment:
       Please pick this up from CommonConstants

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)

Review comment:
       The code here will work, but seems to be implying that a segment can exist in both realtime and offline tables with the same name.
   
   You can choose to just consider the offline side for now. Let us add the realtime side later.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v -> v.getValue().values().contains(state))

Review comment:
       segment names are case sensitive. Please do not ignore case.
   Also, if there are two replicas, and one of them is ONLINE and the other is not, how will this work?
   Instead of counting the number, can we get the state of all replicas of the segment and return true ONLY if they are in the intended state?

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final int DEFAULT_MAX_SLEEP_TIME_MS = 30000;
+  private static final int DEFAULT_SLEEP_INTERVAL_MS = 200;
+
   public enum Op {
-    UPLOAD,
-    DELETE
+    UPLOAD, DELETE

Review comment:
       nit:  I prefer one per line for readability




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551530533



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v -> v.getValue().values().contains(state))

Review comment:
       Yes, the expectation is that all replicas reach ONLINE. Some replicas may take more time to get there than others, so we should wait until all replicas reach ONLINE state. If any replica reaches ERROR state, we can bail out early with a failure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548765100



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Yes, upload Segment has a delay to show up in external view API, instead of sleep already added scheduler delay in `getTableExternalView()` method. For now, added 5 seconds which I think would be good enough for the compatibility test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548763761



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       It may be better to also add a deleteOnExit to this file handle. Just in case someone starts the test suite and kills it mid-way?

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Since it takes some time for the segment to  make it to the external view, it is best to put this in a while loop. I suggest:
   `
   while (segmentNotOnline()) {
     sleep(100ms)
     if (it took more than max time) {
       return false
     }
   }
   `
   Max time can be hardcoded as 30s.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Also, in general we may have more than one replica of the segment, so it may be better to parse the external view for all replicas. True, we will be having only one replica to start with, but I would like to be able to extend the tests  along that dimension if needed.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();

Review comment:
       Why do we have this? Wy not just use `_segmentName`?

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();
+    File indexDir = new File(outputDir, segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Same comment about looping with sleep and handling multiple replicas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551659343



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,219 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
+    }
+    return true;
+  }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
     }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    long segmentCount;
+    while ((segmentCount = getSegmentCountInState(state)) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      } else if (segmentCount == -1) {
+        LOGGER.error("Upload segment verification failed, one or more segment(s) is in {} state.",
+            CommonConstants.Helix.StateModel.SegmentStateModel.ERROR);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
     return true;
   }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for OFFLINE which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return -1 in case of ERROR, 1 if all matches the state else 0.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    final Set<String> segmentState =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equals(_segmentName)).flatMap(x -> x.getValue().values().stream())
+            .collect(Collectors.toSet()) : Collections.emptySet();
+
+    if (segmentState.contains(CommonConstants.Helix.StateModel.SegmentStateModel.ERROR)) {
+      return -1;
+    }
+
+    return segmentState.stream().allMatch(x -> x.contains(state)) ? 1 : 0;

Review comment:
       Better if this is a boolean return?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551516583



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v -> v.getValue().values().contains(state))

Review comment:
       Considering the below sample table's external view, where replicas are 2, the expectation is both `Server_hostname1_8001` and `Server_hostname2_8001` should in `ONLINE` else if its should be derived segment is `OFFLINE`?
   
   **Sample Table External view:**
   ```
   {
     "id" : "table_OFFLINE",
     "simpleFields" : {
       "BUCKET_SIZE" : "0",
       "INSTANCE_GROUP_TAG" : "table_OFFLINE",
       "MAX_PARTITIONS_PER_INSTANCE" : "1",
       "NUM_PARTITIONS" : "1",
       "REBALANCE_MODE" : "CUSTOMIZED",
       "REPLICAS" : "2"
     },
     "mapFields" : {
       "account_summary_additive_daily_0" : {
         "Server_hostname1_8001" : "ONLINE",
         "Server_hostname2_8001" : "ONLINE"
       }
     },
     "listFields" : {
     }
   }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548765100



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Yes, upload Segment has a delay to show up in external view API, instead of sleep add scheduler delay in `getTableExternalView()` method. For now, added 5 seconds which i think would be good enough for the compatibility test.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io edited a comment on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
codecov-io edited a comment on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-751130863


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=h1) Report
   > Merging [#6382](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=desc) (d658cc9) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `22.18%`.
   > The diff coverage is `38.60%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6382/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree)
   
   ```diff
   @@             Coverage Diff             @@
   ##           master    #6382       +/-   ##
   ===========================================
   - Coverage   66.44%   44.26%   -22.19%     
   ===========================================
     Files        1075     1318      +243     
     Lines       54773    64106     +9333     
     Branches     8168     9329     +1161     
   ===========================================
   - Hits        36396    28376     -8020     
   - Misses      15700    33379    +17679     
   + Partials     2677     2351      -326     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration | `44.26% <38.60%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `52.83% <0.00%> (-13.84%)` | :arrow_down: |
   | [...org/apache/pinot/broker/queryquota/HitCounter.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9IaXRDb3VudGVyLmphdmE=) | `0.00% <0.00%> (-100.00%)` | :arrow_down: |
   | [...che/pinot/broker/queryquota/MaxHitRateTracker.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9NYXhIaXRSYXRlVHJhY2tlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...ache/pinot/broker/queryquota/QueryQuotaEntity.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcXVlcnlxdW90YS9RdWVyeVF1b3RhRW50aXR5LmphdmE=) | `0.00% <0.00%> (-50.00%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ceselector/StrictReplicaGroupInstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL1N0cmljdFJlcGxpY2FHcm91cEluc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/TimeSegmentPruner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL1RpbWVTZWdtZW50UHJ1bmVyLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...roker/routing/segmentpruner/interval/Interval.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...r/routing/segmentpruner/interval/IntervalTree.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9zZWdtZW50cHJ1bmVyL2ludGVydmFsL0ludGVydmFsVHJlZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | ... and [1319 more](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=footer). Last update [6d8b09e...d658cc9](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551621980



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v -> v.getValue().values().contains(state))

Review comment:
       Fixed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] codecov-io commented on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
codecov-io commented on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-751130863


   # [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=h1) Report
   > Merging [#6382](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=desc) (5eca9cc) into [master](https://codecov.io/gh/apache/incubator-pinot/commit/1beaab59b73f26c4e35f3b9bc856b03806cddf5a?el=desc) (1beaab5) will **decrease** coverage by `1.23%`.
   > The diff coverage is `56.80%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-pinot/pull/6382/graphs/tree.svg?width=650&height=150&src=pr&token=4ibza2ugkz)](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #6382      +/-   ##
   ==========================================
   - Coverage   66.44%   65.21%   -1.24%     
   ==========================================
     Files        1075     1298     +223     
     Lines       54773    62769    +7996     
     Branches     8168     9112     +944     
   ==========================================
   + Hits        36396    40935    +4539     
   - Misses      15700    18912    +3212     
   - Partials     2677     2922     +245     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `65.21% <56.80%> (?)` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...e/pinot/broker/api/resources/PinotBrokerDebug.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYXBpL3Jlc291cmNlcy9QaW5vdEJyb2tlckRlYnVnLmphdmE=) | `0.00% <0.00%> (-79.32%)` | :arrow_down: |
   | [...ot/broker/broker/AllowAllAccessControlFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL0FsbG93QWxsQWNjZXNzQ29udHJvbEZhY3RvcnkuamF2YQ==) | `71.42% <ø> (-28.58%)` | :arrow_down: |
   | [.../helix/BrokerUserDefinedMessageHandlerFactory.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvYnJva2VyL2hlbGl4L0Jyb2tlclVzZXJEZWZpbmVkTWVzc2FnZUhhbmRsZXJGYWN0b3J5LmphdmE=) | `33.96% <0.00%> (-32.71%)` | :arrow_down: |
   | [...ker/routing/instanceselector/InstanceSelector.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtYnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9icm9rZXIvcm91dGluZy9pbnN0YW5jZXNlbGVjdG9yL0luc3RhbmNlU2VsZWN0b3IuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ava/org/apache/pinot/client/AbstractResultSet.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Fic3RyYWN0UmVzdWx0U2V0LmphdmE=) | `66.66% <0.00%> (+9.52%)` | :arrow_up: |
   | [.../main/java/org/apache/pinot/client/Connection.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0Nvbm5lY3Rpb24uamF2YQ==) | `35.55% <0.00%> (-13.29%)` | :arrow_down: |
   | [...inot/client/JsonAsyncHttpPinotClientTransport.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY2xpZW50cy9waW5vdC1qYXZhLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvY2xpZW50L0pzb25Bc3luY0h0dHBQaW5vdENsaWVudFRyYW5zcG9ydC5qYXZh) | `10.90% <0.00%> (-51.10%)` | :arrow_down: |
   | [...not/common/assignment/InstancePartitionsUtils.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vYXNzaWdubWVudC9JbnN0YW5jZVBhcnRpdGlvbnNVdGlscy5qYXZh) | `73.80% <ø> (+0.63%)` | :arrow_up: |
   | [...common/config/tuner/NoOpTableTableConfigTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL05vT3BUYWJsZVRhYmxlQ29uZmlnVHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | [...ot/common/config/tuner/RealTimeAutoIndexTuner.java](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree#diff-cGlub3QtY29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9waW5vdC9jb21tb24vY29uZmlnL3R1bmVyL1JlYWxUaW1lQXV0b0luZGV4VHVuZXIuamF2YQ==) | `100.00% <ø> (ø)` | |
   | ... and [1151 more](https://codecov.io/gh/apache/incubator-pinot/pull/6382/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=footer). Last update [b8bc74f...5eca9cc](https://codecov.io/gh/apache/incubator-pinot/pull/6382?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551621980



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)
+      throws IOException {
+    long offlineSegmentCount =
+        getExternalViewForTable().offline != null ? getExternalViewForTable().offline.entrySet().stream()
+            .filter(k -> k.getKey().equalsIgnoreCase(_segmentName)).filter(v -> v.getValue().values().contains(state))

Review comment:
       Implemented change to return failure immediately if any of the segments in `ERROR` state else will check if all the segments are not in `OFFLINE` and match the one we intend to be.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r551519877



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,220 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, upload the files to controller and verify segment upload.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+      return verifySegmentInState(STATE_ONLINE);
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Verify given table and segment name in the controller are in the state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return true if segment is in the state provided in the parameter, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentInState(String state)
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getSegmentCountInState(state) <= 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, state);
+    return true;
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+      return verifySegmentDeleted();
+    } catch (Exception e) {
+      LOGGER.error("Request to delete the segment {} for the table {} failed.", _segmentName, _tableName, e);
+      return false;
+    }
+  }
+
+  /**
+   * Verify given table name and segment name deleted from the controller.
+   * @return true if no segment found, else false.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private boolean verifySegmentDeleted()
+      throws IOException, InterruptedException {
+    long startTime = System.currentTimeMillis();
+    while (getCountForSegmentName() > 0) {
+      if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+        LOGGER.error("Delete segment verification failed, count is greater than zero after max wait time {} ms.",
+            DEFAULT_MAX_SLEEP_TIME_MS);
+        return false;
+      }
+      LOGGER.warn("Delete segment verification count greater than zero, will retry after {} ms.",
+          DEFAULT_SLEEP_INTERVAL_MS);
+      Thread.sleep(DEFAULT_SLEEP_INTERVAL_MS);
+    }
+
+    LOGGER.info("Successfully delete the segment {} for the table {}.", _segmentName, _tableName);
+    return true;
+  }
+
+  /**
+   * Retrieve external view for the given table name.
+   * @return TableViews.TableView of OFFLINE and REALTIME segments.
+   */
+  private TableViews.TableView getExternalViewForTable()
+      throws IOException {
+    return JsonUtils.stringToObject(ControllerTest.sendGetRequest(
+        ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forTableExternalView(_tableName)),
+        TableViews.TableView.class);
+  }
+
+  /**
+   * Retrieve the number of segments for both OFFLINE and REALTIME which are in state matching the parameter.
+   * @param state of the segment to be verified in the controller.
+   * @return count for OFFLINE and REALTIME segments.
+   */
+  private long getSegmentCountInState(String state)

Review comment:
       ok. Will do with option 2 for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548778238



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Added loop for verification.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();
+    File indexDir = new File(outputDir, segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Added loop for verification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549047843



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {
+        if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+          LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+              DEFAULT_MAX_SLEEP_TIME_MS);
+          return false;
+        }
+        LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_WAIT_TIME_MS);
+        Thread.sleep(DEFAULT_WAIT_TIME_MS);
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() > 0) {

Review comment:
       do agree for better code readability moving the verification code to a separate method makes sense, but did not quite understand `We cannot assume that this is the last segment to be deleted`. `SegmentOp` implemented based on your design that it will be called multiple times during upgrade/downgrade, but whenever it's called it will be for 1 segment. Let me know if my understanding is not correct.
   
   Also, Segment in any of these states `ONLINE, CONSUMING, ERROR, OFFLINE` is considered not deleted?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549042872



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/TableOp.java
##########
@@ -104,9 +103,12 @@ boolean runOp() {
 
   private boolean createSchema() {
     try {
-      ControllerTest.sendPostRequest(
-          ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forSchemaCreate(),
-          FileUtils.readFileToString(new File(_schemaFileName)));
+      Map<String, String> headers = new HashMap<String, String>() {{
+        put("Content-type", "application/json");
+      }};
+      ControllerTest
+          .sendPostRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL).forSchemaCreate(),

Review comment:
       Thanks for the fix

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {
+        if ((System.currentTimeMillis() - startTime) > DEFAULT_MAX_SLEEP_TIME_MS) {
+          LOGGER.error("Upload segment verification failed, count is zero after max wait time {} ms.",
+              DEFAULT_MAX_SLEEP_TIME_MS);
+          return false;
+        }
+        LOGGER.warn("Upload segment verification count is zero, will retry after {} ms.", DEFAULT_WAIT_TIME_MS);
+        Thread.sleep(DEFAULT_WAIT_TIME_MS);
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    File indexDir = new File(outputDir, _segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", _segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, _segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() > 0) {

Review comment:
       We cannot assume that this is the last segment to be deleted. 
   Better to call a method called `verifySegmentdeleted()`,  and have that method fetch the externalView and make sure the segment is not there in it.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       Thanks

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +103,172 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    localTempDir.deleteOnExit();
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      long startTime = System.currentTimeMillis();
+      while (getOnlineSegmentCount() <= 0) {

Review comment:
       we can't assume that this is the first segment that we upload. The tests will upload segments to the same table across each upgrade/downgrade. Better to call a method called `verifySegmentInState("ONLINE")` and have that method fetch the externalview and make sure that the segment is in the required state. We can then use the same method for realtime table and consuming segments also, if needed. (As in, `verifySegmentInState("CONSUMING")). Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r549163398



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -36,15 +57,23 @@
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class SegmentOp extends BaseOp {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SegmentOp.class);
+  private static final FileFormat DEFAULT_FILE_FORMAT = FileFormat.CSV;
+  private static final String STATE_ONLINE = "ONLINE";
+  private static final int DEFAULT_MAX_SLEEP_TIME_MS = 30000;
+  private static final int DEFAULT_WAIT_TIME_MS = 5000;

Review comment:
       Fixed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] amarnathkarthik commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
amarnathkarthik commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548766320



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       In case, if killed mid-way, the next execution will create a new file since the file name is appended with UUID, therefore it should not impact the test. IMO, the general convention when using `FileUtils.getTempDirectory()` would be that file/directory will be deleted on exit, hence I don't think its required and also file name with prefix `pinot-compat-test` should provide the context in the case file are listed using OS file system.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548768949



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       I am not worried about the next test using the same dir. I am worried about the cruft left behind. If you can confirm that it is indeed deleted, that will be good. Otherwise, please add a deleteOnExit(). Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu merged pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu merged pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] mcvsubbu commented on pull request #6382: Compatibility test for segment operations upload and delete

Posted by GitBox <gi...@apache.org>.
mcvsubbu commented on pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#issuecomment-754333769


   Thanks, @amarnathkarthik for your contribution. Also, big thanks for fixing existing bugs in this area.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org