You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by mc...@apache.org on 2022/02/08 23:10:49 UTC

[pinot] branch master updated: Disable recursion in PinotFS copy (#8162)

This is an automated email from the ASF dual-hosted git repository.

mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 1382d29  Disable recursion in PinotFS copy (#8162)
1382d29 is described below

commit 1382d293236d40e80d1652729fdb1632f92f7e48
Author: Subbu Subramaniam <mc...@users.noreply.github.com>
AuthorDate: Tue Feb 8 15:10:30 2022 -0800

    Disable recursion in PinotFS copy (#8162)
    
    * Disable recursion in PinotFS copy
    
    All uses of PinotFS copy API involve copying a tarred segment and
    untarring it. So, copying a directory recursively will not work (the
    untar will fail). It also results in wastage of effort in copying
    across file systems.
    
    Also disabled the file scheme in during segment upload on the controller,
    since the URL based upload is meant to provide an external URL to be
    picked up by the controller.
    
    * Removed redundant comment
    
    * Fix to add a different API for recursive copy
    
    * Fix lint errors
---
 .../utils/fetcher/SegmentFetcherFactory.java       |  1 +
 .../PinotSegmentUploadDownloadRestletResource.java |  5 ++++
 .../hadoop/HadoopSegmentGenerationJobRunner.java   |  6 ++---
 .../pinot/plugin/filesystem/HadoopPinotFS.java     | 15 +++++++++++
 .../apache/pinot/spi/filesystem/LocalPinotFS.java  | 25 ++++++++++++++-----
 .../org/apache/pinot/spi/filesystem/PinotFS.java   | 16 ++++++++++--
 .../pinot/spi/filesystem/LocalPinotFSTest.java     | 29 ----------------------
 7 files changed, 57 insertions(+), 40 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
index c6d71e8..743cd1b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java
@@ -144,6 +144,7 @@ public class SegmentFetcherFactory {
 
   private void fetchSegmentToLocalInternal(URI uri, File dest)
       throws Exception {
+    // caller untars
     getSegmentFetcher(uri.getScheme()).fetchSegmentToLocal(uri, dest);
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
index 484bc30..253a3fb 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java
@@ -381,6 +381,11 @@ public class PinotSegmentUploadDownloadRestletResource {
     }
     LOGGER.info("Downloading segment from {} to {} for table {}", currentSegmentLocationURI, destFile.getAbsolutePath(),
         tableName);
+    URI uri = new URI(currentSegmentLocationURI);
+    if (uri.getScheme().equalsIgnoreCase("file")) {
+      throw new ControllerApplicationException(LOGGER, "Unsupported URI: " + currentSegmentLocationURI,
+          Response.Status.BAD_REQUEST);
+    }
     SegmentFetcherFactory.fetchSegmentToLocal(currentSegmentLocationURI, destFile);
   }
 
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
index 4cbfdf5..0608fd7 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-hadoop/src/main/java/org/apache/pinot/plugin/ingestion/batch/hadoop/HadoopSegmentGenerationJobRunner.java
@@ -259,7 +259,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
       job.getConfiguration().setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
 
       // But we have to copy ourselves to HDFS, and add us to the distributed cache, so
-      // that the mapper code is available. 
+      // that the mapper code is available.
       addMapperJarToDistributedCache(job, outputDirFS, stagingDirURI);
 
       org.apache.hadoop.conf.Configuration jobConf = job.getConfiguration();
@@ -277,7 +277,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
       // In order to ensure pinot plugins would be loaded to each worker, this method
       // tars entire plugins directory and set this file into Distributed cache.
       // Then each mapper job will untar the plugin tarball, and set system properties accordingly.
-      // Note that normally we'd just use Hadoop's support for putting jars on the 
+      // Note that normally we'd just use Hadoop's support for putting jars on the
       // classpath via the distributed cache, but some of the plugins (e.g. the pinot-parquet
       // input format) include Hadoop classes, which can be incompatibile with the Hadoop
       // installation/jars being used to run the mapper, leading to errors such as:
@@ -386,7 +386,7 @@ public class HadoopSegmentGenerationJobRunner extends Configured implements Inge
       throws Exception {
     File ourJar = new File(getClass().getProtectionDomain().getCodeSource().getLocation().toURI());
     Path distributedCacheJar = new Path(stagingDirURI.toString(), ourJar.getName());
-    outputDirFS.copyFromLocalFile(ourJar, distributedCacheJar.toUri());
+    outputDirFS.copyFromLocalDir(ourJar, distributedCacheJar.toUri());
     job.addFileToClassPath(distributedCacheJar);
   }
 
diff --git a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
index 760c6d2..5136c10 100644
--- a/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
+++ b/pinot-plugins/pinot-file-system/pinot-hdfs/src/main/java/org/apache/pinot/plugin/filesystem/HadoopPinotFS.java
@@ -183,6 +183,9 @@ public class HadoopPinotFS extends BasePinotFS {
       if (_hadoopFS == null) {
         throw new RuntimeException("_hadoopFS client is not initialized when trying to copy files");
       }
+      if (_hadoopFS.isDirectory(remoteFile)) {
+        throw new IllegalArgumentException(srcUri.toString() + " is a direactory");
+      }
       long startMs = System.currentTimeMillis();
       _hadoopFS.copyToLocalFile(remoteFile, localFile);
       LOGGER.debug("copied {} from hdfs to {} in local for size {}, take {} ms", srcUri, dstFilePath, dstFile.length(),
@@ -196,9 +199,21 @@ public class HadoopPinotFS extends BasePinotFS {
   @Override
   public void copyFromLocalFile(File srcFile, URI dstUri)
       throws Exception {
+    if (srcFile.isDirectory()) {
+      throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is a direactory");
+    }
     _hadoopFS.copyFromLocalFile(new Path(srcFile.toURI()), new Path(dstUri));
   }
 
+  public void copyFromLocalDir(File srcFile, URI dstUri)
+      throws Exception {
+    Path srcPath = new Path(srcFile.toURI());
+    if (!_hadoopFS.isDirectory(srcPath)) {
+      throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is not a directory");
+    }
+    _hadoopFS.copyFromLocalFile(srcPath, new Path(dstUri));
+  }
+
   @Override
   public boolean isDirectory(URI uri) {
     try {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
index cca7a15..4a87f3b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/LocalPinotFS.java
@@ -85,7 +85,7 @@ public class LocalPinotFS extends BasePinotFS {
   @Override
   public boolean copy(URI srcUri, URI dstUri)
       throws IOException {
-    copy(toFile(srcUri), toFile(dstUri));
+    copy(toFile(srcUri), toFile(dstUri), false);
     return true;
   }
 
@@ -118,13 +118,22 @@ public class LocalPinotFS extends BasePinotFS {
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile)
       throws Exception {
-    copy(toFile(srcUri), dstFile);
+    copy(toFile(srcUri), dstFile, false);
   }
 
   @Override
   public void copyFromLocalFile(File srcFile, URI dstUri)
       throws Exception {
-    copy(srcFile, toFile(dstUri));
+    copy(srcFile, toFile(dstUri), false);
+  }
+
+  @Override
+  public void copyFromLocalDir(File srcFile, URI dstUri)
+      throws Exception {
+    if (!srcFile.isDirectory()) {
+      throw new IllegalArgumentException(srcFile.getAbsolutePath() + " is not a directory");
+    }
+    copy(srcFile, toFile(dstUri), true);
   }
 
   @Override
@@ -163,14 +172,18 @@ public class LocalPinotFS extends BasePinotFS {
     }
   }
 
-  private static void copy(File srcFile, File dstFile)
+  private static void copy(File srcFile, File dstFile, boolean recursive)
       throws IOException {
     if (dstFile.exists()) {
       FileUtils.deleteQuietly(dstFile);
     }
     if (srcFile.isDirectory()) {
-      // Throws Exception on failure
-      FileUtils.copyDirectory(srcFile, dstFile);
+      if (recursive) {
+        FileUtils.copyDirectory(srcFile, dstFile);
+      } else {
+        // Throws Exception on failure
+        throw new IOException(srcFile.getAbsolutePath() + " is a directory");
+      }
     } else {
       // Will create parent directories, throws Exception on failure
       FileUtils.copyFile(srcFile, dstFile);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
index 17d4032..af0d45b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/filesystem/PinotFS.java
@@ -143,7 +143,7 @@ public interface PinotFS extends Closeable, Serializable {
 
   /**
    * Copies a file from a remote filesystem to the local one. Keeps the original file.
-   * @param srcUri location of current file on remote filesystem
+   * @param srcUri location of current file on remote filesystem (must not be a directory)
    * @param dstFile location of destination on local filesystem
    * @throws Exception if srcUri is not valid or not present, or timeout when downloading file to local
    */
@@ -151,9 +151,21 @@ public interface PinotFS extends Closeable, Serializable {
       throws Exception;
 
   /**
+   * @apiNote This API is to be used with caution, since recursive copies can lead to adverse situations.
+   *
+   * Add srcFile to filesystem at the given dst name and the source is kept intact afterwards.
+   * @param srcFile location of src file on local disk (must be a directory)
+   * @param dstUri location of dst on remote filesystem
+   * @throws Exception if fileUri is not valid or not present, or timeout when uploading file from local
+   */
+  default void copyFromLocalDir(File srcFile, URI dstUri)
+      throws Exception {
+    throw new UnsupportedOperationException("Recursive copy not supported");
+  }
+  /**
    * The src file is on the local disk. Add it to filesystem at the given dst name and the source is kept intact
    * afterwards.
-   * @param srcFile location of src file on local disk
+   * @param srcFile location of src file on local disk (must not be a directory)
    * @param dstUri location of dst on remote filesystem
    * @throws Exception if fileUri is not valid or not present, or timeout when uploading file from local
    */
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
index 7f6e387..c4bc5d2 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/filesystem/LocalPinotFSTest.java
@@ -205,28 +205,6 @@ public class LocalPinotFSTest {
       // Expected.
     }
 
-    // Check that directory only copy worked
-    localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
-    Assert.assertTrue(localPinotFS.exists(secondTempDir.toURI()));
-
-    // Copying directory with files to directory with files
-    File testFile = new File(firstTempDir, "testFile");
-    Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
-    File newTestFile = new File(secondTempDir, "newTestFile");
-    Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath());
-
-    localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
-    Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), true).length, 1);
-
-    // Copying directory with files under another directory.
-    File firstTempDirUnderSecondTempDir = new File(secondTempDir, firstTempDir.getName());
-    localPinotFS.copy(firstTempDir.toURI(), firstTempDirUnderSecondTempDir.toURI());
-    Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI()));
-    // There're two files/directories under secondTempDir.
-    Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 2);
-    // The file under src directory also got copied under dst directory.
-    Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(), true).length, 1);
-
     // len of dir = exception
     try {
       localPinotFS.length(firstTempDir.toURI());
@@ -234,12 +212,5 @@ public class LocalPinotFSTest {
     } catch (IllegalArgumentException e) {
 
     }
-
-    Assert.assertTrue(testFile.exists());
-
-    localPinotFS.copyFromLocalFile(testFile, secondTestFileUri);
-    Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
-    localPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
-    Assert.assertTrue(localPinotFS.exists(secondTestFileUri));
   }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org