You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/01/09 22:29:00 UTC
[incubator-pinot] branch pinotfs-copy-ensure-parent-dir-exist
updated: Make different PinotFSes have the same behaviors
This is an automated email from the ASF dual-hosted git repository.
jlli pushed a commit to branch pinotfs-copy-ensure-parent-dir-exist
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/pinotfs-copy-ensure-parent-dir-exist by this push:
new 5a78693 Make different PinotFSes have the same behaviors
5a78693 is described below
commit 5a786935400b938b31c4a70fb8f915862db60807
Author: Jack Li(Analytics Engineering) <jl...@jlli-mn1.linkedin.biz>
AuthorDate: Wed Jan 9 14:28:37 2019 -0800
Make different PinotFSes have the same behaviors
---
.../com/linkedin/pinot/filesystem/PinotFS.java | 4 +-
.../linkedin/pinot/filesystem/HadoopPinotFS.java | 99 +++++++---
.../pinot/filesystem/HadoopPinotFSTest.java | 220 +++++++++++++++++++++
3 files changed, 299 insertions(+), 24 deletions(-)
diff --git a/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java b/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java
index f5ca54c..6d3142a 100644
--- a/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java
+++ b/pinot-filesystem/src/main/java/com/linkedin/pinot/filesystem/PinotFS.java
@@ -95,7 +95,7 @@ public abstract class PinotFS implements Closeable {
public abstract long length(URI fileUri) throws IOException;
/**
- * Lists all the files at the location provided. Lists recursively.
+ * Lists all the files and directories at the location provided. Lists recursively.
* Throws exception if this abstract pathname is not valid, or if
* an I/O error occurs.
* @param fileUri location of file
@@ -130,7 +130,7 @@ public abstract class PinotFS implements Closeable {
* @return true if uri is a directory, false otherwise.
* @throws Exception if uri is not valid or present
*/
- public abstract boolean isDirectory(URI uri);
+ public abstract boolean isDirectory(URI uri) throws IOException;
/**
* Returns the age of the file
diff --git a/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java b/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java
index a74f6f2..803bd3b 100644
--- a/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java
+++ b/pinot-hadoop-filesystem/src/main/java/com/linkedin/pinot/filesystem/HadoopPinotFS.java
@@ -22,12 +22,12 @@ import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
import org.apache.commons.configuration.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -71,6 +71,9 @@ public class HadoopPinotFS extends PinotFS {
@Override
public boolean delete(URI segmentUri, boolean forceDelete) throws IOException {
+ if (!exists(segmentUri)) {
+ return true;
+ }
// Returns false if we are moving a directory and that directory is not empty
if (isDirectory(segmentUri)
&& listFiles(segmentUri, false).length > 0
@@ -82,8 +85,13 @@ public class HadoopPinotFS extends PinotFS {
@Override
public boolean move(URI srcUri, URI dstUri, boolean overwrite) throws IOException {
- if (exists(dstUri) && !overwrite) {
- return false;
+ if (exists(dstUri)) {
+ if (!overwrite) {
+ return false;
+ } else {
+ delete(dstUri, true);
+ mkdir(dstUri);
+ }
}
return _hadoopFS.rename(new Path(srcUri), new Path(dstUri));
}
@@ -94,28 +102,62 @@ public class HadoopPinotFS extends PinotFS {
*/
@Override
public boolean copy(URI srcUri, URI dstUri) throws IOException {
+ if (exists(dstUri)) {
+ delete(dstUri, true);
+ }
+ if (isDirectory(srcUri)) {
+ mkdir(dstUri);
+ doCopyDirectory(srcUri, dstUri);
+ } else {
+ doCopyFile(srcUri, dstUri);
+ }
+ return true;
+ }
+
+ /**
+ * Does the actual copy behavior on directory.
+ */
+ private void doCopyDirectory(URI srcUri, URI dstUri) throws IOException {
Path source = new Path(srcUri);
- Path target = new Path(dstUri);
- RemoteIterator<LocatedFileStatus> sourceFiles = _hadoopFS.listFiles(source, true);
- if (sourceFiles != null) {
- while (sourceFiles.hasNext()) {
- boolean succeeded = FileUtil.copy(_hadoopFS, sourceFiles.next().getPath(), _hadoopFS, target, true, _hadoopConf);
- if (!succeeded) {
- return false;
- }
+ FileStatus[] srcFiles = listStatus(source, true);
+ for (FileStatus srcFile : srcFiles) {
+ Path srcPath = srcFile.getPath();
+ Path dstPath = new Path(dstUri.getPath(), srcFile.getPath().getName());
+ if (isDirectory(srcPath.toUri())) {
+ doCopyDirectory(srcPath.toUri(), dstPath.toUri());
+ } else {
+ doCopyFile(srcPath.toUri(), dstPath.toUri());
}
}
- return true;
}
+ /**
+ * Does the actual copy behavior on file.
+ */
+ private boolean doCopyFile(URI srcUri, URI dstUri) throws IOException {
+ Path source = new Path(srcUri);
+ Path target = new Path(dstUri);
+ URI parentUri = target.getParent().toUri();
+ if (!exists(parentUri)) {
+ mkdir(parentUri);
+ }
+ return FileUtil.copy(_hadoopFS, source, _hadoopFS, target, false, _hadoopConf);
+ }
+
+ /**
+ * Check if
+ */
@Override
public boolean exists(URI fileUri) throws IOException {
- return _hadoopFS.exists(new Path(fileUri));
+ return fileUri != null && _hadoopFS.exists(new Path(fileUri));
}
@Override
public long length(URI fileUri) throws IOException {
- return _hadoopFS.getLength(new Path(fileUri));
+ if (isDirectory(fileUri)) {
+ throw new IllegalArgumentException("File is directory");
+ }
+ return _hadoopFS.getFileStatus(new Path(fileUri)).getLen();
}
@Override
@@ -123,10 +165,10 @@ public class HadoopPinotFS extends PinotFS {
ArrayList<String> filePathStrings = new ArrayList<>();
Path path = new Path(fileUri);
if (_hadoopFS.exists(path)) {
- RemoteIterator<LocatedFileStatus> fileListItr = _hadoopFS.listFiles(path, recursive);
- while (fileListItr != null && fileListItr.hasNext()) {
- LocatedFileStatus file = fileListItr.next();
- filePathStrings.add(file.getPath().toUri().toString());
+ // _hadoopFS.listFiles(path, false) will not return directories as files, thus use listStatus(path) here.
+ FileStatus[] files = listStatus(path, recursive);
+ for (FileStatus file : files) {
+ filePathStrings.add(file.getPath().toUri().getRawPath());
}
} else {
throw new IllegalArgumentException("segmentUri is not valid");
@@ -136,6 +178,20 @@ public class HadoopPinotFS extends PinotFS {
return retArray;
}
+ private FileStatus[] listStatus(Path path, boolean recursive) throws IOException {
+ List<FileStatus> fileStatuses = new ArrayList<>();
+ FileStatus[] files = _hadoopFS.listStatus(path);
+ for (FileStatus file : files) {
+ fileStatuses.add(file);
+ if (file.isDirectory() && recursive) {
+ List<FileStatus> subFiles = Arrays.asList(listStatus(file.getPath(), true));
+ fileStatuses.addAll(subFiles);
+ }
+ }
+ FileStatus[] fileStatusesArr = new FileStatus[fileStatuses.size()];
+ return fileStatuses.toArray(fileStatusesArr);
+ }
+
@Override
public void copyToLocalFile(URI srcUri, File dstFile) throws Exception {
LOGGER.debug("starting to fetch segment from hdfs");
@@ -172,9 +228,8 @@ public class HadoopPinotFS extends PinotFS {
}
@Override
- public boolean isDirectory(URI uri) {
- FileStatus fileStatus = new FileStatus();
- fileStatus.setPath(new Path(uri));
+ public boolean isDirectory(URI uri) throws IOException {
+ FileStatus fileStatus = _hadoopFS.getFileStatus(new Path(uri));
return fileStatus.isDirectory();
}
diff --git a/pinot-hadoop-filesystem/src/test/java/com/linkedin/pinot/filesystem/HadoopPinotFSTest.java b/pinot-hadoop-filesystem/src/test/java/com/linkedin/pinot/filesystem/HadoopPinotFSTest.java
new file mode 100644
index 0000000..8089f0e
--- /dev/null
+++ b/pinot-hadoop-filesystem/src/test/java/com/linkedin/pinot/filesystem/HadoopPinotFSTest.java
@@ -0,0 +1,220 @@
+package com.linkedin.pinot.filesystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class HadoopPinotFSTest {
+ private File testFile;
+ private File _absoluteTmpDirPath;
+ private File _newTmpDir;
+ private File _nonExistentTmpFolder;
+ private HadoopPinotFS _hadoopPinotFS;
+
+ @BeforeClass
+ public void setup() {
+ _absoluteTmpDirPath =
+ new File(System.getProperty("java.io.tmpdir"), HadoopPinotFSTest.class.getSimpleName() + "first");
+ FileUtils.deleteQuietly(_absoluteTmpDirPath);
+ Assert.assertTrue(_absoluteTmpDirPath.mkdir(), "Could not make directory " + _absoluteTmpDirPath.getPath());
+ try {
+ testFile = new File(_absoluteTmpDirPath, "testFile");
+ Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
+ Assert.assertTrue(testFile.exists());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ _newTmpDir = new File(System.getProperty("java.io.tmpdir"), HadoopPinotFSTest.class.getSimpleName() + "second");
+ FileUtils.deleteQuietly(_newTmpDir);
+ Assert.assertTrue(_newTmpDir.mkdir(), "Could not make directory " + _newTmpDir.getPath());
+
+ _nonExistentTmpFolder = new File(System.getProperty("java.io.tmpdir"),
+ HadoopPinotFSTest.class.getSimpleName() + "nonExistentParent/nonExistent");
+
+ _absoluteTmpDirPath.deleteOnExit();
+ _newTmpDir.deleteOnExit();
+ _nonExistentTmpFolder.deleteOnExit();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _absoluteTmpDirPath.delete();
+ _newTmpDir.delete();
+ }
+
+ @Test
+ public void testHadoopPinotFS() throws Exception {
+ _hadoopPinotFS = new HadoopPinotFS();
+
+ final Configuration conf = new PropertiesConfiguration();
+ _hadoopPinotFS.init(conf);
+
+ // Check whether a directory exists
+ Assert.assertTrue(_hadoopPinotFS.exists(_absoluteTmpDirPath.toURI()));
+ Assert.assertTrue(_hadoopPinotFS.lastModified(_absoluteTmpDirPath.toURI()) > 0L);
+ Assert.assertTrue(_hadoopPinotFS.isDirectory(_absoluteTmpDirPath.toURI()));
+
+ URI testFileUri = testFile.toURI();
+ // Check whether a file exists
+ Assert.assertTrue(_hadoopPinotFS.exists(testFileUri));
+ Assert.assertFalse(_hadoopPinotFS.isDirectory(testFileUri));
+
+ File file = new File(_absoluteTmpDirPath, "secondTestFile");
+ URI secondTestFileUri = file.toURI();
+ // Check that file does not exist
+ Assert.assertFalse(_hadoopPinotFS.exists(secondTestFileUri));
+
+ String[] files = _hadoopPinotFS.listFiles(_absoluteTmpDirPath.toURI(), true);
+ Assert.assertEquals(files.length, 1);
+ _hadoopPinotFS.copy(testFileUri, secondTestFileUri);
+ files = _hadoopPinotFS.listFiles(_absoluteTmpDirPath.toURI(), true);
+ Assert.assertEquals(files.length, 2);
+ // Check file copy worked when file was not created
+ Assert.assertTrue(_hadoopPinotFS.exists(secondTestFileUri));
+
+ // Create another file in the same path
+ File thirdTestFile = new File(_absoluteTmpDirPath, "thirdTestFile");
+ Assert.assertTrue(thirdTestFile.createNewFile(), "Could not create file " + thirdTestFile.getPath());
+
+ File newAbsoluteTempDirPath = new File(_absoluteTmpDirPath, "absoluteTwo");
+ Assert.assertTrue(newAbsoluteTempDirPath.mkdir());
+
+ // Create a testDir and file underneath directory
+ File testDir = new File(newAbsoluteTempDirPath, "testDir");
+ Assert.assertTrue(testDir.mkdir(), "Could not make directory " + testDir.getAbsolutePath());
+ File testDirFile = new File(testDir, "testFile");
+ // Assert that recursive list files and nonrecursive list files are as expected
+ Assert.assertTrue(testDirFile.createNewFile(), "Could not create file " + testDir.getAbsolutePath());
+ Assert.assertEquals(_hadoopPinotFS.listFiles(newAbsoluteTempDirPath.toURI(), false),
+ new String[]{testDir.getAbsolutePath()});
+ Assert.assertEquals(_hadoopPinotFS.listFiles(newAbsoluteTempDirPath.toURI(), true),
+ new String[]{testDir.getAbsolutePath(), testDirFile.getAbsolutePath()});
+
+ // Create another parent dir so we can test recursive move
+ File newAbsoluteTempDirPath3 = new File(_absoluteTmpDirPath, "absoluteThree");
+ Assert.assertTrue(newAbsoluteTempDirPath3.mkdir());
+ Assert.assertEquals(newAbsoluteTempDirPath3.listFiles().length, 0);
+
+ _hadoopPinotFS.move(newAbsoluteTempDirPath.toURI(), newAbsoluteTempDirPath3.toURI(), true);
+ Assert.assertFalse(_hadoopPinotFS.exists(newAbsoluteTempDirPath.toURI()));
+ Assert.assertTrue(_hadoopPinotFS.exists(newAbsoluteTempDirPath3.toURI()));
+ Assert.assertTrue(_hadoopPinotFS.exists(new File(newAbsoluteTempDirPath3, "testDir").toURI()));
+ Assert.assertTrue(
+ _hadoopPinotFS.exists(new File(new File(newAbsoluteTempDirPath3, "testDir"), "testFile").toURI()));
+
+ // Check file copy to location where something already exists still works
+ _hadoopPinotFS.copy(testFileUri, thirdTestFile.toURI());
+ // Check length of file
+ Assert.assertEquals(_hadoopPinotFS.length(secondTestFileUri), 0);
+ Assert.assertTrue(_hadoopPinotFS.exists(thirdTestFile.toURI()));
+
+ // Check that method deletes dst directory during move and is successful by overwriting dir
+ Assert.assertTrue(_newTmpDir.exists());
+ // create a file in the dst folder
+ File dstFile = new File(_newTmpDir.getPath() + "/newFile");
+ dstFile.createNewFile();
+
+ // Expected that if the target already exists, a move without overwrite will not succeed
+ Assert.assertFalse(_hadoopPinotFS.move(_absoluteTmpDirPath.toURI(), _newTmpDir.toURI(), false));
+
+ int numFiles = _absoluteTmpDirPath.listFiles().length;
+ Assert.assertTrue(_hadoopPinotFS.move(_absoluteTmpDirPath.toURI(), _newTmpDir.toURI(), true));
+ Assert.assertEquals(_absoluteTmpDirPath.length(), 0);
+ Assert.assertEquals(_newTmpDir.listFiles().length, numFiles);
+ Assert.assertFalse(dstFile.exists());
+
+ // Check that copying a file to a non-existent destination folder will work
+ FileUtils.deleteQuietly(_nonExistentTmpFolder);
+ Assert.assertFalse(_nonExistentTmpFolder.exists());
+ File srcFile = new File(_absoluteTmpDirPath, "srcFile");
+ _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+ Assert.assertTrue(srcFile.createNewFile());
+ dstFile = new File(_nonExistentTmpFolder.getPath() + "/newFile");
+ Assert.assertFalse(dstFile.exists());
+ Assert.assertTrue(_hadoopPinotFS.copy(srcFile.toURI(), dstFile.toURI()));
+ Assert.assertTrue(srcFile.exists());
+ Assert.assertTrue(dstFile.exists());
+
+ //Check that copying a folder to a non-existent destination folder works
+ FileUtils.deleteQuietly(_nonExistentTmpFolder);
+ Assert.assertFalse(_nonExistentTmpFolder.exists());
+ _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+ dstFile = new File(_nonExistentTmpFolder.getPath() + "/srcFile");
+ Assert.assertFalse(dstFile.exists());
+ Assert.assertTrue(_hadoopPinotFS.copy(_absoluteTmpDirPath.toURI(), _nonExistentTmpFolder.toURI()));
+ Assert.assertTrue(dstFile.exists());
+ FileUtils.deleteQuietly(srcFile);
+ Assert.assertFalse(srcFile.exists());
+
+ // Check that moving a file to a non-existent destination folder will work
+ FileUtils.deleteQuietly(_nonExistentTmpFolder);
+ Assert.assertFalse(_nonExistentTmpFolder.exists());
+ srcFile = new File(_absoluteTmpDirPath, "srcFile");
+ _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+ Assert.assertTrue(srcFile.createNewFile());
+ dstFile = new File(_nonExistentTmpFolder.getPath() + "/newFile");
+ Assert.assertFalse(dstFile.exists());
+ Assert.assertTrue(_hadoopPinotFS.move(srcFile.toURI(), dstFile.toURI(), true)); // overwrite flag has no impact
+ Assert.assertFalse(srcFile.exists());
+ Assert.assertTrue(dstFile.exists());
+
+ //Check that moving a folder to a non-existent destination folder works
+ FileUtils.deleteQuietly(_nonExistentTmpFolder);
+ Assert.assertFalse(_nonExistentTmpFolder.exists());
+ srcFile = new File(_absoluteTmpDirPath, "srcFile");
+ _hadoopPinotFS.mkdir(_absoluteTmpDirPath.toURI());
+ Assert.assertTrue(srcFile.createNewFile());
+ dstFile = new File(_nonExistentTmpFolder.getPath() + "/srcFile");
+ Assert.assertFalse(dstFile.exists());
+ Assert.assertTrue(_hadoopPinotFS.move(_absoluteTmpDirPath.toURI(), _nonExistentTmpFolder.toURI(),
+ true)); // overwrite flag has no impact
+ Assert.assertTrue(dstFile.exists());
+
+ _hadoopPinotFS.delete(secondTestFileUri, true);
+ // Check deletion from final location worked
+ Assert.assertFalse(_hadoopPinotFS.exists(secondTestFileUri));
+
+ File firstTempDir = new File(_absoluteTmpDirPath, "firstTempDir");
+ File secondTempDir = new File(_absoluteTmpDirPath, "secondTempDir");
+ _hadoopPinotFS.mkdir(firstTempDir.toURI());
+ Assert.assertTrue(firstTempDir.exists(), "Could not make directory " + firstTempDir.getPath());
+
+ // Check that directory only copy worked
+ _hadoopPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
+ Assert.assertTrue(_hadoopPinotFS.exists(secondTempDir.toURI()));
+
+ // Copying directory with files to directory with files
+ File testFile = new File(firstTempDir, "testFile");
+ Assert.assertTrue(testFile.createNewFile(), "Could not create file " + testFile.getPath());
+ File newTestFile = new File(secondTempDir, "newTestFile");
+ Assert.assertTrue(newTestFile.createNewFile(), "Could not create file " + newTestFile.getPath());
+
+ _hadoopPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
+ files = _hadoopPinotFS.listFiles(secondTempDir.toURI(), true);
+ Assert.assertEquals(files.length, 1);
+
+ // len of a directory should throw an exception.
+ try {
+ _hadoopPinotFS.length(firstTempDir.toURI());
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+
+ }
+
+ Assert.assertTrue(testFile.exists());
+
+ _hadoopPinotFS.copyFromLocalFile(testFile, secondTestFileUri);
+ Assert.assertTrue(_hadoopPinotFS.exists(secondTestFileUri));
+ _hadoopPinotFS.copyToLocalFile(testFile.toURI(), new File(secondTestFileUri));
+ Assert.assertTrue(_hadoopPinotFS.exists(secondTestFileUri));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org