You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2021/01/29 22:41:31 UTC

[GitHub] [incubator-pinot] kkrugler opened a new pull request #6506: Fix hadoop batch ingest

kkrugler opened a new pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506


   ## Description
   Numerous fixes and cleanup for the Hadoop batch segment generation code.
   ## Upgrade Notes
   Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
   * [ ] Yes (Please label as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR fix a zero-downtime upgrade introduced earlier?
   * [ ] Yes (Please label this as **<code>backward-incompat</code>**, and complete the section below on Release Notes)
   
   Does this PR otherwise need attention when creating release notes? Things to consider:
   - New configuration options
   - Deprecation of configurations
   - Signature changes to public methods/interfaces
   - New plugins added or old plugins removed
   * [ ] Yes (Please label this PR as **<code>release-notes</code>** and complete the section on Release Notes)
   ## Release Notes
   N/A
   
   ## Documentation
   N/A


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568261109



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       Yeah, I will make the changes for standalone and spark




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568261109



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       Yeah, I will make the changes for standalone and spark




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568261628



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       Hi @fx19880617 I think I fixed this in Standalone a while back (that's what I meant by "It should be fixed in Standalone...", sorry for not being more clear.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r569208664



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       Cool! Making a PR for Spark as well: https://github.com/apache/incubator-pinot/pull/6537




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568803274



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java
##########
@@ -53,4 +53,31 @@ public void testRelativeURIs() throws URISyntaxException {
         "hdfs://namenode2/output/dir/subdir/file.tar.gz");
   }
   
+  // Don't lose authority portion of inputDirURI when creating output files
+  // https://github.com/apache/incubator-pinot/issues/6355
+
+  @Test
+  public void testGetFileURI() throws Exception {
+    // Typical file URI
+    validateFileURI(new URI("file:/path/to/"));
+
+    // Namenode as authority, plus non-standard port
+    validateFileURI(new URI("hdfs://namenode:9999/path/to/"));
+
+    // S3 bucket + path
+    validateFileURI(new URI("s3://bucket/path/to/"));
+
+    // S3 URI with userInfo (username/password)
+    validateFileURI(new URI("s3://username:password@bucket/path/to/"));
+  }
+
+  private void validateFileURI(URI directoryURI) throws URISyntaxException {
+    URI fileURI = new URI(directoryURI.toString() + "file");
+    String rawPath = fileURI.getRawPath();
+
+    Assert.assertEquals(SegmentGenerationUtils.getFileURI(rawPath, fileURI).toString(),

Review comment:
       Hi @fx19880617 - there was a test, for `file:/path/to/`. But thanks for asking, as I added more tests for different types of file URIs, and found an issue in the `getFileURI()` method with `file:///path/to/`. Plus I learned something about file URIs :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568254617



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       It should be fixed in Standalone. I could file an issue for Spark, lmk.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#issuecomment-770088861


   Fix for #6492


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r567583510



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       This is good! I think we should try to fix it in Standalone and Spark as well.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java
##########
@@ -53,4 +53,31 @@ public void testRelativeURIs() throws URISyntaxException {
         "hdfs://namenode2/output/dir/subdir/file.tar.gz");
   }
   
+  // Don't lose authority portion of inputDirURI when creating output files
+  // https://github.com/apache/incubator-pinot/issues/6355
+
+  @Test
+  public void testGetFileURI() throws Exception {
+    // Typical file URI
+    validateFileURI(new URI("file:/path/to/"));
+
+    // Namenode as authority, plus non-standard port
+    validateFileURI(new URI("hdfs://namenode:9999/path/to/"));
+
+    // S3 bucket + path
+    validateFileURI(new URI("s3://bucket/path/to/"));
+
+    // S3 URI with userInfo (username/password)
+    validateFileURI(new URI("s3://username:password@bucket/path/to/"));
+  }
+
+  private void validateFileURI(URI directoryURI) throws URISyntaxException {
+    URI fileURI = new URI(directoryURI.toString() + "file");
+    String rawPath = fileURI.getRawPath();
+
+    Assert.assertEquals(SegmentGenerationUtils.getFileURI(rawPath, fileURI).toString(),

Review comment:
       Can we also test this `getFileURI` method with the input file path without `hdfs://` protocol?
   

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);

Review comment:
       Shall we also do a check on the job setup phase, so it will fail fast before segment creation?

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);
+      } else {
+        fs.move(sourceFileUri, destFileUri, true);
+      }
+    }
+  }
+
   /**
    * Can be overridden to plug in custom mapper.
    */
   protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
     return HadoopSegmentCreationMapper.class;
   }
 
-  protected void packPluginsToDistributedCache(Job job) {
+  /**
+   * We have to put our jar (which contains the mapper) in the distributed cache and add it to the classpath,
+   * as otherwise it's not available (since the pinot-all jar - which is bigger - is what we've set as our job jar).
+   * 
+   * @param job
+   * @param outputDirFS
+   * @param stagingDirURI
+   * @throws Exception
+   */
+  protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) throws Exception {
+    File ourJar = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
+    Path distributedCacheJar = new Path(stagingDirURI.toString(), ourJar.getName());
+    outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri());
+    job.addFileToClassPath(distributedCacheJar);
+  }
+  
+  protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) {
     File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
     if (pluginsRootDir.exists()) {
-      File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
       try {
+        File pluginsTarGzFile = File.createTempFile("pinot-plugins-", ".tar.gz");
         TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
-      } catch (IOException e) {
+        
+        // Copy to staging directory
+        Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
+        outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
+        job.addCacheFile(cachedPluginsTarball.toUri());
+      } catch (Exception e) {
         LOGGER.error("Failed to tar plugins directory", e);

Review comment:
       make the error msg clear like, Failed to tar and upload plugins directory to staging directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#issuecomment-770439684


   Hi @fx19880617 - it was a weird issue. Building locally, I would get a URI to a Pinot plugin jar file with:
   
   ``` java
   SomePintoClassInPluginJar.class.getProtectionDomain().getCodeSource().getLocation().toURI()
   ```
   
   But on the build server, this returns a URI to the directory containing the `SomePintoClassInPluginJar.class` file. I've changed the code to use a class from some dependency jar that's not part of Pinot (as we're just using it to test creating a tarball of files, and adding a dependency file to the distributed cache). So all checks seem to be passing now.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568254949



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);

Review comment:
       Good idea, let me look into that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 merged pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 merged pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568254617



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       It should be fixed in Standalone. I could file an issue for Spark, lmk.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);

Review comment:
       Good idea, let me look into that

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -208,11 +220,12 @@ public void run()
     } else {
       LOGGER.info("Creating segments with data files: {}", filteredFiles);
       for (int i = 0; i < numDataFiles; i++) {
-        String dataFilePath = filteredFiles.get(i);
-
-        File localFile = new File("tmp");
+        // Typically PinotFS implementations list files without a protocol, so we lose (for example) the
+        // hdfs:// portion of the path. Call getFileURI() to fix this up.
+        URI inputFileURI = SegmentGenerationUtils.getFileURI(filteredFiles.get(i), inputDirURI);

Review comment:
       Hi @fx19880617 I think I fixed this in Standalone a while back (that's what I meant by "It should be fixed in Standalone...", sorry for not being more clear.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/test/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationUtilsTest.java
##########
@@ -53,4 +53,31 @@ public void testRelativeURIs() throws URISyntaxException {
         "hdfs://namenode2/output/dir/subdir/file.tar.gz");
   }
   
+  // Don't lose authority portion of inputDirURI when creating output files
+  // https://github.com/apache/incubator-pinot/issues/6355
+
+  @Test
+  public void testGetFileURI() throws Exception {
+    // Typical file URI
+    validateFileURI(new URI("file:/path/to/"));
+
+    // Namenode as authority, plus non-standard port
+    validateFileURI(new URI("hdfs://namenode:9999/path/to/"));
+
+    // S3 bucket + path
+    validateFileURI(new URI("s3://bucket/path/to/"));
+
+    // S3 URI with userInfo (username/password)
+    validateFileURI(new URI("s3://username:password@bucket/path/to/"));
+  }
+
+  private void validateFileURI(URI directoryURI) throws URISyntaxException {
+    URI fileURI = new URI(directoryURI.toString() + "file");
+    String rawPath = fileURI.getRawPath();
+
+    Assert.assertEquals(SegmentGenerationUtils.getFileURI(rawPath, fileURI).toString(),

Review comment:
       Hi @fx19880617 - there was a test, for `file:/path/to/`. But thanks for asking, as I added more tests for different types of file URIs, and found an issue in the `getFileURI()` method with `file:///path/to/`. Plus I learned something about file URIs :)

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);

Review comment:
       Unfortunately there's no way to do that before generating the segments, as the segment names (can) depend on the data used to generate the segment, and that's what we need to check for collision in the output directory.

##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);
+      } else {
+        fs.move(sourceFileUri, destFileUri, true);
+      }
+    }
+  }
+
   /**
    * Can be overridden to plug in custom mapper.
    */
   protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
     return HadoopSegmentCreationMapper.class;
   }
 
-  protected void packPluginsToDistributedCache(Job job) {
+  /**
+   * We have to put our jar (which contains the mapper) in the distributed cache and add it to the classpath,
+   * as otherwise it's not available (since the pinot-all jar - which is bigger - is what we've set as our job jar).
+   * 
+   * @param job
+   * @param outputDirFS
+   * @param stagingDirURI
+   * @throws Exception
+   */
+  protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) throws Exception {
+    File ourJar = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
+    Path distributedCacheJar = new Path(stagingDirURI.toString(), ourJar.getName());
+    outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri());
+    job.addFileToClassPath(distributedCacheJar);
+  }
+  
+  protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) {
     File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
     if (pluginsRootDir.exists()) {
-      File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
       try {
+        File pluginsTarGzFile = File.createTempFile("pinot-plugins-", ".tar.gz");
         TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
-      } catch (IOException e) {
+        
+        // Copy to staging directory
+        Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
+        outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
+        job.addCacheFile(cachedPluginsTarball.toUri());
+      } catch (Exception e) {
         LOGGER.error("Failed to tar plugins directory", e);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#issuecomment-770530393


   > Hi @fx19880617 - it was a weird issue. Building locally, I would get a URI to a Pinot plugin jar file with:
   > 
   > ```java
   > SomePintoClassInPluginJar.class.getProtectionDomain().getCodeSource().getLocation().toURI()
   > ```
   > 
   > But on the build server, this returns a URI to the directory containing the `SomePintoClassInPluginJar.class` file. I've changed the code to use a class from some dependency jar that's not part of Pinot (as we're just using it to test creating a tarball of files, and adding a dependency file to the distributed cache). So all checks seem to be passing now.
   
   I think that's fine. We can just test packaging the jar and ensure the unpacked jar is the same.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#issuecomment-772300880


   Thanks for fixing this!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568815823



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);

Review comment:
       Unfortunately there's no way to do that before generating the segments, as the segment names (can) depend on the data used to generate the segment, and that's what we need to check for collision in the output directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on a change in pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on a change in pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#discussion_r568947541



##########
File path: pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
##########
@@ -269,33 +300,80 @@ public void run()
         throw new RuntimeException("Job failed: " + job);
       }
 
-      LOGGER.info("Trying to copy segment tars from staging directory: [{}] to output directory [{}]", stagingDirURI,
+      LOGGER.info("Moving segment tars from staging directory [{}] to output directory [{}]", stagingDirURI,
           outputDirURI);
-      outputDirFS.copy(new Path(stagingDir, SEGMENT_TAR_DIR).toUri(), outputDirURI);
+      moveFiles(outputDirFS, new Path(stagingDir, SEGMENT_TAR_SUBDIR_NAME).toUri(), outputDirURI, _spec.isOverwriteOutput());
     } finally {
       LOGGER.info("Trying to clean up staging directory: [{}]", stagingDirURI);
       outputDirFS.delete(stagingDirURI, true);
     }
   }
 
+  /**
+   * Move all files from the <sourceDir> to the <destDir>, but don't delete existing contents of destDir.
+   * If <overwrite> is true, and the source file exists in the destination directory, then replace it, otherwise
+   * log a warning and continue. We assume that source and destination directories are on the same filesystem,
+   * so that move() can be used.
+   * 
+   * @param fs 
+   * @param sourceDir
+   * @param destDir
+   * @param overwrite
+   * @throws IOException 
+   * @throws URISyntaxException 
+   */
+  private void moveFiles(PinotFS fs, URI sourceDir, URI destDir, boolean overwrite) throws IOException, URISyntaxException {
+    for (String sourcePath : fs.listFiles(sourceDir, true)) {
+      URI sourceFileUri = SegmentGenerationUtils.getFileURI(sourcePath, sourceDir);
+      String sourceFilename = FilenameUtils.getName(sourceFileUri.getPath());
+      URI destFileUri = SegmentGenerationUtils.getRelativeOutputPath(sourceDir, sourceFileUri, destDir).resolve(sourceFilename);
+      
+      if (!overwrite && fs.exists(destFileUri)) {
+        LOGGER.warn("Can't overwrite existing output segment tar file: {}", destFileUri);
+      } else {
+        fs.move(sourceFileUri, destFileUri, true);
+      }
+    }
+  }
+
   /**
    * Can be overridden to plug in custom mapper.
    */
   protected Class<? extends Mapper<LongWritable, Text, LongWritable, Text>> getMapperClass() {
     return HadoopSegmentCreationMapper.class;
   }
 
-  protected void packPluginsToDistributedCache(Job job) {
+  /**
+   * We have to put our jar (which contains the mapper) in the distributed cache and add it to the classpath,
+   * as otherwise it's not available (since the pinot-all jar - which is bigger - is what we've set as our job jar).
+   * 
+   * @param job
+   * @param outputDirFS
+   * @param stagingDirURI
+   * @throws Exception
+   */
+  protected void addMapperJarToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) throws Exception {
+    File ourJar = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
+    Path distributedCacheJar = new Path(stagingDirURI.toString(), ourJar.getName());
+    outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri());
+    job.addFileToClassPath(distributedCacheJar);
+  }
+  
+  protected void packPluginsToDistributedCache(Job job, PinotFS outputDirFS, URI stagingDirURI) {
     File pluginsRootDir = new File(PluginManager.get().getPluginsRootDir());
     if (pluginsRootDir.exists()) {
-      File pluginsTarGzFile = new File(PINOT_PLUGINS_TAR_GZ);
       try {
+        File pluginsTarGzFile = File.createTempFile("pinot-plugins-", ".tar.gz");
         TarGzCompressionUtils.createTarGzFile(pluginsRootDir, pluginsTarGzFile);
-      } catch (IOException e) {
+        
+        // Copy to staging directory
+        Path cachedPluginsTarball = new Path(stagingDirURI.toString(), SegmentGenerationUtils.PINOT_PLUGINS_TAR_GZ);
+        outputDirFS.copyFromLocalFile(pluginsTarGzFile, cachedPluginsTarball.toUri());
+        job.addCacheFile(cachedPluginsTarball.toUri());
+      } catch (Exception e) {
         LOGGER.error("Failed to tar plugins directory", e);

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] kkrugler commented on pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
kkrugler commented on pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#issuecomment-770105131


   Hi @fx19880617 - I see that the Hadoop batch ingest unit test is failing, though it passes on my Mac when running `mvn clean test` from inside the `pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/` directory. What's the best way to view the build server test logs?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [incubator-pinot] fx19880617 commented on pull request #6506: Fix hadoop batch ingest

Posted by GitBox <gi...@apache.org>.
fx19880617 commented on pull request #6506:
URL: https://github.com/apache/incubator-pinot/pull/6506#issuecomment-770146194


   > Hi @fx19880617 - I see that the Hadoop batch ingest unit test is failing, though it passes on my Mac when running `mvn clean test` from inside the `pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/` directory. What's the best way to view the build server test logs?
   
   will you be able to see the logs: https://github.com/apache/incubator-pinot/pull/6506/checks?check_run_id=1795216286
   
   Seems that it's trying to copy a directory not a file?
   ```
   [INFO] Running org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunnerTest
   Error:  Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 1.752 s <<< FAILURE! - in org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunnerTest
   Error:  org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunnerTest.testSegmentGeneration  Time elapsed: 1.415 s  <<< FAILURE!
   java.io.IOException: Source '/home/runner/work/incubator-pinot/incubator-pinot/pinot-plugins/pinot-input-format/pinot-csv/target/classes' exists but is a directory
   	at org.apache.commons.io.FileUtils.copyFile(FileUtils.java:1077)
   	at org.apache.commons.io.FileUtils.copyFile(FileUtils.java:1038)
   	at org.apache.pinot.plugin.ingestion.batch.hadoop.HadoopSegmentGenerationJobRunnerTest.testSegmentGeneration(HadoopSegmentGenerationJobRunnerTest.java:98)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:108)
   	at org.testng.internal.Invoker.invokeMethod(Invoker.java:661)
   	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:869)
   	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:1193)
   	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:126)
   	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
   	at org.testng.TestRunner.privateRun(TestRunner.java:744)
   	at org.testng.TestRunner.run(TestRunner.java:602)
   	at org.testng.SuiteRunner.runTest(SuiteRunner.java:380)
   	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:375)
   	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:340)
   	at org.testng.SuiteRunner.run(SuiteRunner.java:289)
   	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
   	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:86)
   	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1301)
   	at org.testng.TestNG.runSuitesLocally(TestNG.java:1226)
   	at org.testng.TestNG.runSuites(TestNG.java:1144)
   	at org.testng.TestNG.run(TestNG.java:1115)
   	at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:136)
   	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:112)
   	at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:99)
   	at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:145)
   	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:428)
   	at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:162)
   	at org.apache.maven.surefire.booter.ForkedBooter.run(ForkedBooter.java:562)
   	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:548)
   
   [INFO] 
   [INFO] Results:
   [INFO] 
   Error:  Failures: 
   Error:    HadoopSegmentGenerationJobRunnerTest.testSegmentGeneration:98 ยป IO Source '/ho...
   [INFO] 
   Error:  Tests run: 1, Failures: 1, Errors: 0, Skipped: 0
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org