You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/02/25 21:19:13 UTC

[GitHub] [incubator-pinot] Jackie-Jiang commented on a change in pull request #6598: Default to use local directory sequence id for segment name generation

Jackie-Jiang commented on a change in pull request #6598:
URL: https://github.com/apache/incubator-pinot/pull/6598#discussion_r583206821



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
##########
@@ -60,7 +60,9 @@
   public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
 
   // Assign sequence ids to input files based at each local directory level
-  public static final String LOCAL_DIRECTORY_SEQUENCE_ID = "local.directory.sequence.id";
+  @Deprecated
+  public static final String DEPRECATED_USE_LOCAL_DIRECTORY_SEQUENCE_ID = "local.directory.sequence.id";
+  public static final String USE_LOCAL_DIRECTORY_SEQUENCE_ID = "use.local.directory.sequence.id";

Review comment:
       If we use local directory sequence id by default, then I would suggest configuring global sequence id so that the default is false.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentGenerationJobRunner.java
##########
@@ -152,86 +188,106 @@ public void run()
           continue;
         }
       }
-      if (!inputDirFS.isDirectory(new URI(file))) {
+      if (!_inputDirFS.isDirectory(new URI(file))) {
         filteredFiles.add(file);
       }
     }
     File localTempDir = new File(FileUtils.getTempDirectory(), "pinot-" + UUID.randomUUID());
     try {
-      //create localTempDir for input and output
-      File localInputTempDir = new File(localTempDir, "input");
-      FileUtils.forceMkdir(localInputTempDir);
-      File localOutputTempDir = new File(localTempDir, "output");
-      FileUtils.forceMkdir(localOutputTempDir);
-
-      //Read TableConfig, Schema
-      Schema schema = SegmentGenerationUtils.getSchema(_spec.getTableSpec().getSchemaURI());
-      TableConfig tableConfig = SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI());
-
       int numInputFiles = filteredFiles.size();
-      CountDownLatch segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
-      //iterate on the file list, for each
-      for (int i = 0; i < numInputFiles; i++) {
-        final URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);
-
-        //copy input path to local
-        File localInputDataFile = new File(localInputTempDir, new File(inputFileURI.getPath()).getName());
-        inputDirFS.copyToLocalFile(inputFileURI, localInputDataFile);
-
-        //create task spec
-        SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
-        taskSpec.setInputFilePath(localInputDataFile.getAbsolutePath());
-        taskSpec.setOutputDirectoryPath(localOutputTempDir.getAbsolutePath());
-        taskSpec.setRecordReaderSpec(_spec.getRecordReaderSpec());
-        taskSpec.setSchema(schema);
-        taskSpec.setTableConfig(tableConfig);
-        taskSpec.setSequenceId(i);
-        taskSpec.setSegmentNameGeneratorSpec(_spec.getSegmentNameGeneratorSpec());
-        taskSpec.setCustomProperty(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY, inputFileURI.toString());
-
-        LOGGER.info("Submitting one Segment Generation Task for {}", inputFileURI);
-        _executorService.submit(() -> {
-          File localSegmentDir = null;
-          File localSegmentTarFile = null;
-          try {
-            //invoke segmentGenerationTask
-            SegmentGenerationTaskRunner taskRunner = new SegmentGenerationTaskRunner(taskSpec);
-            String segmentName = taskRunner.run();
-            // Tar segment directory to compress file
-            localSegmentDir = new File(localOutputTempDir, segmentName);
-            String segmentTarFileName = URLEncoder.encode(segmentName + Constants.TAR_GZ_FILE_EXT, "UTF-8");
-            localSegmentTarFile = new File(localOutputTempDir, segmentTarFileName);
-            LOGGER.info("Tarring segment from: {} to: {}", localSegmentDir, localSegmentTarFile);
-            TarGzCompressionUtils.createTarGzFile(localSegmentDir, localSegmentTarFile);
-            long uncompressedSegmentSize = FileUtils.sizeOf(localSegmentDir);
-            long compressedSegmentSize = FileUtils.sizeOf(localSegmentTarFile);
-            LOGGER.info("Size for segment: {}, uncompressed: {}, compressed: {}", segmentName,
-                DataSizeUtils.fromBytes(uncompressedSegmentSize), DataSizeUtils.fromBytes(compressedSegmentSize));
-            //move segment to output PinotFS
-            URI outputSegmentTarURI =
-                SegmentGenerationUtils.getRelativeOutputPath(inputDirURI, inputFileURI, outputDirURI)
-                    .resolve(segmentTarFileName);
-            if (!_spec.isOverwriteOutput() && outputDirFS.exists(outputSegmentTarURI)) {
-              LOGGER
-                  .warn("Not overwrite existing output segment tar file: {}", outputDirFS.exists(outputSegmentTarURI));
-            } else {
-              outputDirFS.copyFromLocalFile(localSegmentTarFile, outputSegmentTarURI);
-            }
-          } catch (Exception e) {
-            LOGGER.error("Failed to generate Pinot segment for file - {}", inputFileURI, e);
-          } finally {
-            segmentCreationTaskCountDownLatch.countDown();
-            FileUtils.deleteQuietly(localSegmentDir);
-            FileUtils.deleteQuietly(localSegmentTarFile);
-            FileUtils.deleteQuietly(localInputDataFile);
+      _segmentCreationTaskCountDownLatch = new CountDownLatch(numInputFiles);
+
+      if (SegmentGenerationJobUtils.useLocalDirectorySequenceId(_spec.getSegmentNameGeneratorSpec())) {
+        Map<String, List<String>> localDirIndex = new HashMap<>();
+        for (String filteredFile : filteredFiles) {
+          java.nio.file.Path filteredParentPath = Paths.get(filteredFile).getParent();
+          if (!localDirIndex.containsKey(filteredParentPath.toString())) {
+            localDirIndex.put(filteredParentPath.toString(), new ArrayList<>());
+          }
+          localDirIndex.get(filteredParentPath.toString()).add(filteredFile);

Review comment:
       ```suggestion
             localDirIndex.computeIfAbsert(filteredParentPath.toString(), k -> new ArrayList<>()).add(filteredFile);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org