You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/09 22:29:00 UTC

[incubator-pinot] branch pinotfs-copy-ensure-parent-dir-exist updated: Make different PinotFSes have the same behaviors

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch pinotfs-copy-ensure-parent-dir-exist
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/pinotfs-copy-ensure-parent-dir-exist by this push:
     new 5a78693  Make different PinotFSes have the same behaviors
5a78693 is described below

commit 5a786935400b938b31c4a70fb8f915862db60807
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Jan 9 14:28:37 2019 -0800

    Make different PinotFSes have the same behaviors
---
 .../com/linkedin/pinot/filesystem/PinotFS.java     |   4 +-
 .../linkedin/pinot/filesystem/HadoopPinotFS.java   |  99 +++++++---
 .../pinot/filesystem/HadoopPinotFSTest.java        | 220 +++++++++++++++++++++
 3 files changed, 299 insertions(+), 24 deletions(-)

diff --git a/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java b/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java
index f5ca54c..6d3142a 100644
--- a/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java
+++ b/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java
@@ -95,7 +95,7 @@ public abstract class PinotFS implements Closeable {
   public abstract long length(URI fileUri) throws IOException;
 
   /**
-   * Lists all the files at the location provided. Lists recursively.
+   * Lists all the files and directories at the location provided. Lists recursively.
    * Throws exception if this abstract pathname is not valid, or if
    * an I/O error occurs.
    * @param fileUri location of file
@@ -130,7 +130,7 @@ public abstract class PinotFS implements Closeable {
    * @return true if uri is a directory, false otherwise.
    * @throws Exception if uri is not valid or present
    */
-  public abstract boolean isDirectory(URI uri);
+  public abstract boolean isDirectory(URI uri) throws IOException;
 
   /**
    * Returns the age of the file
diff --git a/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java b/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java
index a74f6f2..803bd3b 100644
--- a/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java
+++ b/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java
@@ -22,12 +22,12 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.commons.configuration.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -71,6 +71,9 @@ public class HadoopPinotFS extends PinotFS {
 
   @Override
   public boolean delete(URI segmentUri, boolean forceDelete) throws IOException {
+    if (!exists(segmentUri)) {
+      return true;
+    }
     // Returns false if we are moving a directory and that directory is not empty
     if (isDirectory(segmentUri)
         && listFiles(segmentUri, false).length > 0
@@ -82,8 +85,13 @@ public class HadoopPinotFS extends PinotFS {
 
   @Override
   public boolean move(URI srcUri, URI dstUri, boolean overwrite) throws IOException {
-    if (exists(dstUri) && !overwrite) {
-      return false;
+    if (exists(dstUri)) {
+      if (!overwrite) {
+        return false;
+      } else {
+        delete(dstUri, true);
+        mkdir(dstUri);
+      }
     }
     return _hadoopFS.rename(new Path(srcUri), new Path(dstUri));
   }
@@ -94,28 +102,62 @@ public class HadoopPinotFS extends PinotFS {
    */
   @Override
   public boolean copy(URI srcUri, URI dstUri) throws IOException {
+    if (exists(dstUri)) {
+      delete(dstUri, true);
+    }
+    if (isDirectory(srcUri)) {
+      mkdir(dstUri);
+      doCopyDirectory(srcUri, dstUri);
+    } else {
+      doCopyFile(srcUri, dstUri);
+    }
+    return true;
+  }
+
+  /**
+   * Does the actual copy behavior on directory.
+   */
+  private void doCopyDirectory(URI srcUri, URI dstUri) throws IOException {
     Path source = new Path(srcUri);
-    Path target = new Path(dstUri);
-    RemoteIterator<LocatedFileStatus> sourceFiles = _hadoopFS.listFiles(source, true);
-    if (sourceFiles != null) {
-      while (sourceFiles.hasNext()) {
-        boolean succeeded = FileUtil.copy(_hadoopFS, sourceFiles.next().getPath(), _hadoopFS, target, true, _hadoopConf);
-        if (!succeeded) {
-          return false;
-        }
+    FileStatus[] srcFiles = listStatus(source, true);
+    for (FileStatus srcFile : srcFiles) {
+      Path srcPath = srcFile.getPath();
+      Path dstPath = new Path(dstUri.getPath(), srcFile.getPath().getName());
+      if (isDirectory(srcPath.toUri())) {
+        doCopyDirectory(srcPath.toUri(), dstPath.toUri());
+      } else {
+        doCopyFile(srcPath.toUri(), dstPath.toUri());
       }
     }
-    return true;
   }
 
+  /**
+   * Does the actual copy behavior on file.
+   */
+  private boolean doCopyFile(URI srcUri, URI dstUri) throws IOException {
+    Path source = new Path(srcUri);
+    Path target = new Path(dstUri);
+    URI parentUri = target.getParent().toUri();
+    if (!exists(parentUri)) {
+      mkdir(parentUri);
+    }
+    return FileUtil.copy(_hadoopFS, source, _hadoopFS, target, false, _hadoopConf);
+  }
+
+  /**
+   * Check if
+   */
   @Override
   public boolean exists(URI fileUri) throws IOException {
-    return _hadoopFS.exists(new Path(fileUri));
+    return fileUri != null && _hadoopFS.exists(new Path(fileUri));
   }
 
   @Override
   public long length(URI fileUri) throws IOException {
-    return _hadoopFS.getLength(new Path(fileUri));
+    if (isDirectory(fileUri)) {
+      throw new IllegalArgumentException("File is directory");
+    }
+    return _hadoopFS.getFileStatus(new Path(fileUri)).getLen();
   }
 
   @Override
@@ -123,10 +165,10 @@ public class HadoopPinotFS extends PinotFS {
     ArrayList<String> filePathStrings = new ArrayList<>();
     Path path = new Path(fileUri);
     if (_hadoopFS.exists(path)) {
-      RemoteIterator<LocatedFileStatus> fileListItr = _hadoopFS.listFiles(path, recursive);
-      while (fileListItr != null && fileListItr.hasNext()) {
-        LocatedFileStatus file = fileListItr.next();
-        filePathStrings.add(file.getPath().toUri().toString());
+      // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
+      FileStatus[] files = listStatus(path, recursive);
+      for (FileStatus file : files) {
+        filePathStrings.add(file.getPath().toUri().getRawPath());
       }
     } else {
       throw new IllegalArgumentException("segmentUri is not valid");
@@ -136,6 +178,20 @@ public class HadoopPinotFS extends PinotFS {
     return retArray;
   }
 
+  private FileStatus[] listStatus(Path path, boolean recursive) throws IOException {
+    List<FileStatus> fileStatuses = new ArrayList<>();
+    FileStatus[] files = _hadoopFS.listStatus(path);
+    for (FileStatus file : files) {
+      fileStatuses.add(file);
+      if (file.isDirectory() && recursive) {
+        List<FileStatus> subFiles = Arrays.asList(listStatus(file.getPath(), true));
+        fileStatuses.addAll(subFiles);
+      }
+    }
+    FileStatus[] fileStatusesArr = new FileStatus[fileStatuses.size()];
+    return fileStatuses.toArray(fileStatusesArr);
+  }
+
   @Override
   public void copyToLocalFile(URI srcUri, File dstFile) throws Exception {
     LOGGER.debug("starting to fetch segment from hdfs");
@@ -172,9 +228,8 @@ public class HadoopPinotFS extends PinotFS {
   }
 
   @Override
-  public boolean isDirectory(URI uri) {
-    FileStatus fileStatus = new FileStatus();
-    fileStatus.setPath(new Path(uri));
+  public boolean isDirectory(URI uri) throws IOException {
+    FileStatus fileStatus = _hadoopFS.getFileStatus(new Path(uri));
     return fileStatus.isDirectory();
   }
 
diff --git a/pinot-hadoop-filesystem/src/test/java/com/linkedin/pinot/filesystem/HadoopPinotFSTest.java b/pinot-hadoop-filesystem/src/test/java/com/linkedin/pinot/filesystem/HadoopPinotFSTest.java
new file mode 100644
index 0000000..8089f0e
--- /dev/null
+++ b/pinot-hadoop-filesystem/src/test/java/com/linkedin/pinot/filesystem/HadoopPinotFSTest.java
@@ -0,0 +1,220 @@
+package com.linkedin.pinot.filesystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class HadoopPinotFSTest {
+  private File testFile;
+  private File _absoluteTmpDirPath;
+  private File _newTmpDir;
+  private File _nonExistentTmpFolder;
+  private HadoopPinotFS _hadoopPinotFS;
+
+  @BeforeClass
+  public void setup() {
+    _absoluteTmpDirPath =
+        new File(System.getProperty("java.io.tmpdir"), HadoopPinotFSTest.class.getSimpleName() + "first");
+    FileUtils.deleteQuietly(_absoluteTmpDirPath);
+    Assert.assertTrue(_absoluteTmpDirPath.mkdir(), "Could not make directory " + _absoluteTmpDirPath.getPath());
+    try {
+      testFile = new File(_absoluteTmpDirPath, "testFile");
+      Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
+      Assert.assertTrue(testFile.exists());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    _newTmpDir = new File(System.getProperty("java.io.tmpdir"), HadoopPinotFSTest.class.getSimpleName() + "second");
+    FileUtils.deleteQuietly(_newTmpDir);
+    Assert.assertTrue(_newTmpDir.mkdir(), "Could not make directory " + _newTmpDir.getPath());
+
+    _nonExistentTmpFolder = new File(System.getProperty("java.io.tmpdir"),
+        HadoopPinotFSTest.class.getSimpleName() + "nonExistentParent/nonExistent");
+
+    _absoluteTmpDirPath.deleteOnExit();
+    _newTmpDir.deleteOnExit();
+    _nonExistentTmpFolder.deleteOnExit();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _absoluteTmpDirPath.delete();
+    _newTmpDir.delete();
+  }
+
+  @Test
+  public void testHadoopPinotFS() throws Exception {
+    _hadoopPinotFS = new HadoopPinotFS();
+
+    final Configuration conf = new PropertiesConfiguration();
+    _hadoopPinotFS.init(conf);
+
+    // Check whether a directory exists
+    Assert.assertTrue(_hadoopPinotFS.exists(_absoluteTmpDirPath.toURI()));
+    Assert.assertTrue(_hadoopPinotFS.lastModified(_absoluteTmpDirPath.toURI()) > 0L);
+    Assert.assertTrue(_hadoopPinotFS.isDirectory(_absoluteTmpDirPath.toURI()));
+
+    URI testFileUri = testFile.toURI();
+    // Check whether a file exists
+    Assert.assertTrue(_hadoopPinotFS.exists(testFileUri));
+    Assert.assertFalse(_hadoopPinotFS.isDirectory(testFileUri));
+
+    File file = new File(_absoluteTmpDirPath, "secondTestFile");
+    URI secondTestFileUri = file.toURI();
+    // Check that file does not exist
+    Assert.assertFalse(_hadoopPinotFS.exists(secondTestFileUri));
+
+    String[] files = _hadoopPinotFS.listFiles(_absoluteTmpDirPath.toURI(), true);
+    Assert.assertEquals(files.length, 1);
+    _hadoopPinotFS.copy(testFileUri, secondTestFileUri);
+    files = _hadoopPinotFS.listFiles(_absoluteTmpDirPath.toURI(), true);
+    Assert.assertEquals(files.length, 2);
+    // Check file copy worked when file was not created
+    Assert.assertTrue(_hadoopPinotFS.exists(secondTestFileUri));
+
+    // Create another file in the same path
+    File thirdTestFile = new File(_absoluteTmpDirPath, "thirdTestFile");
+    Assert.assertTrue(thirdTestFile.createNewFile(), "Could not create file " + thirdTestFile.getPath());
+
+    File newAbsoluteTempDirPath = new File(_absoluteTmpDirPath, "absoluteTwo");
+    Assert.assertTrue(newAbsoluteTempDirPath.mkdir());
+
+    // Create a testDir and file underneath directory
+    File testDir = new File(newAbsoluteTempDirPath, "testDir");
+    Assert.assertTrue(testDir.mkdir(), "Could not make directory " + testDir.getAbsolutePath());
+    File testDirFile = new File(testDir, "testFile");
+    // Assert that recursive list files and nonrecursive list files are as expected
+    Assert.assertTrue(testDirFile.createNewFile(), "Could not create file " + testDir.getAbsolutePath());
+    Assert.assertEquals(_hadoopPinotFS.listFiles(newAbsoluteTempDirPath.toURI(), false),
+        new String[]{testDir.getAbsolutePath()});
+    Assert.assertEquals(_hadoopPinotFS.listFiles(newAbsoluteTempDirPath.toURI(), true),
+        new String[]{testDir.getAbsolutePath(), testDirFile.getAbsolutePath()});
+
+    // Create another parent dir so we can test recursive move
+    File newAbsoluteTempDirPath3 = new File(_absoluteTmpDirPath, "absoluteThree");
+    Assert.assertTrue(newAbsoluteTempDirPath3.mkdir());
+    Assert.assertEquals(newAbsoluteTempDirPath3.listFiles().length, 0);
+
+    _hadoopPinotFS.move(newAbsoluteTempDirPath.toURI(), newAbsoluteTempDirPath3.toURI(), true);
+    Assert.assertFalse(_hadoopPinotFS.exists(newAbsoluteTempDirPath.toURI()));
+    Assert.assertTrue(_hadoopPinotFS.exists(newAbsoluteTempDirPath3.toURI()));
+    Assert.assertTrue(_hadoopPinotFS.exists(new File(newAbsoluteTempDirPath3, "testDir").toURI()));
+    Assert.assertTrue(
+        _hadoopPinotFS.exists(new File(new File(newAbsoluteTempDirPath3, "testDir"), "testFile").toURI()));
+
+    // Check file copy to location where something already exists still works
+    _hadoopPinotFS.copy(testFileUri, thirdTestFile.toURI());
+    // Check length of file
+    Assert.assertEquals(_hadoopPinotFS.length(secondTestFileUri), 0);
+    Assert.assertTrue(_hadoopPinotFS.exists(thirdTestFile.toURI()));
+
+    // Check that method deletes dst directory during move and is successful by overwriting dir
+    Assert.assertTrue(_newTmpDir.exists());
+    // create a file in the dst folder
+    File dstFile = new File(_newTmpDir.getPath() + "/newFile");
+    dstFile.createNewFile();
+
+    // Expected that if the target already exists, a move without overwrite will not succeed
+    Assert.assertFalse(_hadoopPinotFS.move(_absoluteTmpDirPath.toURI(), _newTmpDir.toURI(), false));
+
+    int numFiles = _absoluteTmpDirPath.listFiles().length;
+    Assert.assertTrue(_hadoopPinotFS.move(_absoluteTmpDirPath.toURI(), _newTmpDir.toURI(), true));
+    Assert.assertEquals(_absoluteTmpDirPath.length(), 0);
+    Assert.assertEquals(_newTmpDir.listFiles().length, numFiles);
+    Assert.assertFalse(dstFile.exists());
+
+    // Check that copying a file to a non-existent destination folder will work
+    FileUtils.deleteQuietly(_nonExistentTmpFolder);
+    Assert.assertFalse(_nonExistentTmpFolder.exists());
+    File srcFile = new File(_absoluteTmpDirPath, "srcFile");
+    _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+    Assert.assertTrue(srcFile.createNewFile());
+    dstFile = new File(_nonExistentTmpFolder.getPath() + "/newFile");
+    Assert.assertFalse(dstFile.exists());
+    Assert.assertTrue(_hadoopPinotFS.copy(srcFile.toURI(), dstFile.toURI()));
+    Assert.assertTrue(srcFile.exists());
+    Assert.assertTrue(dstFile.exists());
+
+    //Check that copying a folder to a non-existent destination folder works
+    FileUtils.deleteQuietly(_nonExistentTmpFolder);
+    Assert.assertFalse(_nonExistentTmpFolder.exists());
+    _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+    dstFile = new File(_nonExistentTmpFolder.getPath() + "/srcFile");
+    Assert.assertFalse(dstFile.exists());
+    Assert.assertTrue(_hadoopPinotFS.copy(_absoluteTmpDirPath.toURI(), _nonExistentTmpFolder.toURI()));
+    Assert.assertTrue(dstFile.exists());
+    FileUtils.deleteQuietly(srcFile);
+    Assert.assertFalse(srcFile.exists());
+
+    // Check that moving a file to a non-existent destination folder will work
+    FileUtils.deleteQuietly(_nonExistentTmpFolder);
+    Assert.assertFalse(_nonExistentTmpFolder.exists());
+    srcFile = new File(_absoluteTmpDirPath, "srcFile");
+    _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+    Assert.assertTrue(srcFile.createNewFile());
+    dstFile = new File(_nonExistentTmpFolder.getPath() + "/newFile");
+    Assert.assertFalse(dstFile.exists());
+    Assert.assertTrue(_hadoopPinotFS.move(srcFile.toURI(), dstFile.toURI(), true)); // overwrite flag has no impact
+    Assert.assertFalse(srcFile.exists());
+    Assert.assertTrue(dstFile.exists());
+
+    //Check that moving a folder to a non-existent destination folder works
+    FileUtils.deleteQuietly(_nonExistentTmpFolder);
+    Assert.assertFalse(_nonExistentTmpFolder.exists());
+    srcFile = new File(_absoluteTmpDirPath, "srcFile");
+    _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+    Assert.assertTrue(srcFile.createNewFile());
+    dstFile = new File(_nonExistentTmpFolder.getPath() + "/srcFile");
+    Assert.assertFalse(dstFile.exists());
+    Assert.assertTrue(_hadoopPinotFS.move(_absoluteTmpDirPath.toURI(), _nonExistentTmpFolder.toURI(),
+        true)); // overwrite flag has no impact
+    Assert.assertTrue(dstFile.exists());
+
+    _hadoopPinotFS.delete(secondTestFileUri, true);
+    // Check deletion from final location worked
+    Assert.assertFalse(_hadoopPinotFS.exists(secondTestFileUri));
+
+    File firstTempDir = new File(_absoluteTmpDirPath, "firstTempDir");
+    File secondTempDir = new File(_absoluteTmpDirPath, "secondTempDir");
+    _hadoopPinotFS.mkdir(firstTempDir.toURI());
+    Assert.assertTrue(firstTempDir.exists(), "Could not make directory " + firstTempDir.getPath());
+
+    // Check that directory only copy worked
+    _hadoopPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
+    Assert.assertTrue(_hadoopPinotFS.exists(secondTempDir.toURI()));
+
+    // Copying directory with files to directory with files
+    File testFile = new File(firstTempDir, "testFile");
+    Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
+    File newTestFile = new File(secondTempDir, "newTestFile");
+    Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath());
+
+    _hadoopPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
+    files = _hadoopPinotFS.listFiles(secondTempDir.toURI(), true);
+    Assert.assertEquals(files.length, 1);
+
+    // len of a directory should throw an exception.
+    try {
+      _hadoopPinotFS.length(firstTempDir.toURI());
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+
+    }
+
+    Assert.assertTrue(testFile.exists());
+
+    _hadoopPinotFS.copyFromLocalFile(testFile, secondTestFileUri);
+    Assert.assertTrue(_hadoopPinotFS.exists(secondTestFileUri));
+    _hadoopPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
+    Assert.assertTrue(_hadoopPinotFS.exists(secondTestFileUri));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org