You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2020/12/24 23:50:06 UTC

[GitHub] [incubator-pinot] mcvsubbu commented on a change in pull request #6382: Compatibility test for segment operations upload and delete

mcvsubbu commented on a change in pull request #6382:
URL: https://github.com/apache/incubator-pinot/pull/6382#discussion_r548763761



##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());

Review comment:
       It may be better to also add a deleteOnExit to this file handle. Just in case someone starts the test suite and kills it mid-way?

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Since it takes some time for the segment to  make it to the external view, it is best to put this in a while loop. I suggest:
   `
   while (segmentNotOnline()) {
     sleep(100ms)
     if (it took more than max time) {
       return false
     }
   }
   `
   Max time can be hardcoded as 30s.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Also, in general we may have more than one replica of the segment, so it may be better to parse the external view for all replicas. True, we will be having only one replica to start with, but I would like to be able to extend the tests  along that dimension if needed.

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();

Review comment:
       Why do we have this? Wy not just use `_segmentName`?

##########
File path: pinot-integration-tests/src/test/java/org/apache/pinot/compat/tests/SegmentOp.java
##########
@@ -82,14 +107,175 @@ public void setTableConfigFileName(String tableConfigFileName) {
     _tableConfigFileName = tableConfigFileName;
   }
 
+  public void setSchemaFileName(String schemaFileName) {
+    _schemaFileName = schemaFileName;
+  }
+
+  public String getSchemaFileName() {
+    return _schemaFileName;
+  }
+
+  public void setRecordReaderConfigFileName(String recordReaderConfigFileName) {
+    _recordReaderConfigFileName = recordReaderConfigFileName;
+  }
+
+  public String getRecordReaderConfigFileName() {
+    return _recordReaderConfigFileName;
+  }
+
+  public void setSegmentName(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
   @Override
   boolean runOp() {
-    switch(_op) {
+    switch (_op) {
       case UPLOAD:
-        System.out.println("Generating segment " + _segmentName + " from " + _inputDataFileName + " and uploading to " +
-            _tableConfigFileName);
+        return createAndUploadSegments();
       case DELETE:
+        return deleteSegment();
     }
     return true;
   }
+
+  /**
+   * Create Segment file, compress to TarGz, and upload the files to controller.
+   * @return true if all successful, false in case of failure.
+   */
+  private boolean createAndUploadSegments() {
+    File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-compat-test-" + UUID.randomUUID());
+    File localOutputTempDir = new File(localTempDir, "output");
+    try {
+      FileUtils.forceMkdir(localOutputTempDir);
+      File segmentTarFile = generateSegment(localOutputTempDir);
+      uploadSegment(segmentTarFile);
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());
+      if (onlineSegmentCount.getFirst() <= 0 && onlineSegmentCount.getSecond() <= 0) {
+        LOGGER.error("Uploaded segment {} not found or not in {} state.", _segmentName, STATE_ONLINE);
+        return false;
+      }
+      LOGGER.info("Successfully verified segment {} and its current status is {}.", _segmentName, STATE_ONLINE);
+
+      return true;
+    } catch (Exception e) {
+      LOGGER.error("Failed to create and upload segment for input data file {}.", _inputDataFileName, e);
+      return false;
+    } finally {
+      FileUtils.deleteQuietly(localTempDir);
+    }
+  }
+
+  /**
+   * Generate the Segment(s) and then compress to TarGz file. Supports generation of segment files for one input data
+   * file.
+   * @param outputDir to generate the Segment file(s).
+   * @return File object of the TarGz compressed segment file.
+   * @throws Exception while generating segment files and/or compressing to TarGz.
+   */
+  private File generateSegment(File outputDir)
+      throws Exception {
+    TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+    _tableName = tableConfig.getTableName();
+
+    Schema schema = JsonUtils.fileToObject(new File(_schemaFileName), Schema.class);
+    RecordReaderConfig recordReaderConfig =
+        RecordReaderFactory.getRecordReaderConfig(DEFAULT_FILE_FORMAT, _recordReaderConfigFileName);
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    segmentGeneratorConfig.setInputFilePath(_inputDataFileName);
+    segmentGeneratorConfig.setFormat(DEFAULT_FILE_FORMAT);
+    segmentGeneratorConfig.setOutDir(outputDir.getAbsolutePath());
+    segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+    segmentGeneratorConfig.setTableName(_tableName);
+    segmentGeneratorConfig.setSegmentName(_segmentName);
+
+    SegmentIndexCreationDriver driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig);
+    driver.build();
+    String segmentName = driver.getSegmentName();
+    File indexDir = new File(outputDir, segmentName);
+    LOGGER.info("Successfully created segment: {} at directory: {}", segmentName, indexDir);
+    File segmentTarFile = new File(outputDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+    TarGzCompressionUtils.createTarGzFile(indexDir, segmentTarFile);
+    LOGGER.info("Tarring segment from: {} to: {}", indexDir, segmentTarFile);
+
+    return segmentTarFile;
+  }
+
+  /**
+   * Upload the TarGz Segment file to the controller.
+   * @param segmentTarFile TarGz Segment file
+   * @throws Exception when upload segment fails.
+   */
+  private void uploadSegment(File segmentTarFile)
+      throws Exception {
+    URI controllerURI = FileUploadDownloadClient.getUploadSegmentURI(new URI(ClusterDescriptor.CONTROLLER_URL));
+    try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) {
+      fileUploadDownloadClient.uploadSegment(controllerURI, segmentTarFile.getName(), segmentTarFile, _tableName);
+    }
+  }
+
+  /**
+   * Deletes the segment for the given segment name and table name.
+   * @return true if delete successful, else false.
+   */
+  private boolean deleteSegment() {
+    try {
+      TableConfig tableConfig = JsonUtils.fileToObject(new File(_tableConfigFileName), TableConfig.class);
+      _tableName = tableConfig.getTableName();
+
+      ControllerTest.sendDeleteRequest(ControllerRequestURLBuilder.baseUrl(ClusterDescriptor.CONTROLLER_URL)
+          .forSegmentDelete(_tableName, _segmentName));
+
+      Pair<Long, Long> onlineSegmentCount = getOnlineSegmentCount(getTableExternalView());

Review comment:
       Same comment about looping with sleep and handling multiple replicas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org