You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by jl...@apache.org on 2019/02/27 17:56:16 UTC

[incubator-pinot] branch master updated: Unify move method in PinotFS (#3834)

This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f647cf  Unify move method in PinotFS (#3834)
8f647cf is described below

commit 8f647cf423d177b612865b9206ecd490c35e731b
Author: Jialiang Li <jl...@linkedin.com>
AuthorDate: Wed Feb 27 09:56:10 2019 -0800

    Unify move method in PinotFS (#3834)
    
    * Unify move method in PinotFS
---
 .../org/apache/pinot/filesystem/AzurePinotFS.java  |  6 +--
 .../apache/pinot/controller/ControllerConf.java    |  4 +-
 .../helix/core/SegmentDeletionManager.java         | 16 +++---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  5 +-
 .../org/apache/pinot/filesystem/LocalPinotFS.java  | 17 ++----
 .../java/org/apache/pinot/filesystem/PinotFS.java  | 62 +++++++++++++++++-----
 .../apache/pinot/filesystem/LocalPinotFSTest.java  | 13 ++++-
 .../pinot/filesystem/PinotFSFactoryTest.java       |  2 +-
 .../org/apache/pinot/filesystem/HadoopPinotFS.java |  5 +-
 9 files changed, 82 insertions(+), 48 deletions(-)

diff --git a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
index 766b2d4..0f07f67 100644
--- a/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
+++ b/pinot-azure-filesystem/src/main/java/org/apache/pinot/filesystem/AzurePinotFS.java
@@ -98,12 +98,8 @@ public class AzurePinotFS extends PinotFS {
   }
 
   @Override
-  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  protected boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
-    if (exists(dstUri) && !overwrite) {
-      return false;
-    }
-    //rename the file
     return _adlStoreClient.rename(srcUri.getPath(), dstUri.getPath());
   }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 53a66bd..036add8 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -170,8 +170,8 @@ public class ControllerConf extends PropertiesConfiguration {
 
   public static URI constructSegmentLocation(String baseDataDir, String tableName, String segmentName) {
     try {
-      return new URI(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
-    } catch (UnsupportedEncodingException | URISyntaxException e) {
+      return getUriFromPath(StringUtil.join(File.separator, baseDataDir, tableName, URLEncoder.encode(segmentName, "UTF-8")));
+    } catch (UnsupportedEncodingException e) {
       LOGGER
           .error("Could not construct segment location with baseDataDir {}, tableName {}, segmentName {}", baseDataDir,
               tableName, segmentName);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index adb6cae..19ad746 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -188,12 +188,16 @@ public class SegmentDeletionManager {
       try {
         if (pinotFS.exists(fileToMoveURI)) {
           // Overwrites the file if it already exists in the target directory.
-          pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true);
-          // Updates last modified.
-          // Touch is needed here so that removeAgedDeletedSegments() works correctly.
-          pinotFS.touch(deletedSegmentDestURI);
-          LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
-              deletedSegmentDestURI.toString());
+          if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) {
+            // Updates last modified.
+            // Touch is needed here so that removeAgedDeletedSegments() works correctly.
+            pinotFS.touch(deletedSegmentDestURI);
+            LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
+                deletedSegmentDestURI.toString());
+          } else {
+            LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToMoveURI.toString(),
+                deletedSegmentDestURI.toString());
+          }
         } else {
           if (!SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
             LOGGER.warn("Not found local segment file for segment {}" + fileToMoveURI.toString());
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 6b32400..e814abe 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -385,7 +385,10 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     try {
-      pinotFS.move(segmentFileURI, uriToMoveTo, true);
+      if (!pinotFS.move(segmentFileURI, uriToMoveTo, true)) {
+        LOGGER.error("Could not move {} to {}", segmentLocation, segmentName);
+        return false;
+      }
     } catch (Exception e) {
       LOGGER.error("Could not move {} to {}", segmentLocation, segmentName, e);
       return false;
diff --git a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
index 423702b..7161031 100644
--- a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
+++ b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/LocalPinotFS.java
@@ -75,24 +75,15 @@ public class LocalPinotFS extends PinotFS {
   }
 
   @Override
-  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  protected boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
     File srcFile = new File(decodeURI(srcUri.getRawPath()));
     File dstFile = new File(decodeURI(dstUri.getRawPath()));
-    if (dstFile.exists()) {
-      if (overwrite) {
-        FileUtils.deleteQuietly(dstFile);
-      } else {
-        // dst file exists, returning
-        return false;
-      }
+    if (srcFile.isDirectory()) {
+      FileUtils.moveDirectory(srcFile, dstFile);
     } else {
-      // ensure the dst path exists
-      FileUtils.forceMkdir(dstFile.getParentFile());
+      FileUtils.moveFile(srcFile, dstFile);
     }
-
-    Files.move(srcFile.toPath(), dstFile.toPath());
-
     return true;
   }
 
diff --git a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
index 0ed9d56..8032014 100644
--- a/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
+++ b/pinot-filesystem/src/main/java/org/apache/pinot/filesystem/PinotFS.java
@@ -22,14 +22,25 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.Paths;
 import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
- * The PinotFS is intended to be a thin wrapper on top of different filesystems. This interface is intended for internal
- * Pinot use only. This class will be implemented for each pluggable storage type.
+ * PinotFS is a restricted FS API that exposes functionality that is required for Pinot to use
+ * different FS implementations. The restrictions are in place due to 2 driving factors:
+ * 1. Prevent unexpected performance hit when a broader API is implemented - especially, we would like
+ *    to reduce calls to remote filesystems that might be needed for a broader API,
+ *    but not necessarily required by Pinot (see the documentation for move() method below).
+ * 2. Provide an interface that is simple to be implemented across different FS types.
+ *    The contract that developers have to adhere to will be simpler.
+ * Please read the method level docs carefully to note the exceptions while using the APIs.
  */
 public abstract class PinotFS implements Closeable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(PinotFS.class);
+
   /**
    * Initializes the configurations specific to that filesystem. For instance, any security related parameters can be
    * initialized here and will not be logged.
@@ -58,31 +69,54 @@ public abstract class PinotFS implements Closeable {
   /**
    * Moves the file or directory from the src to dst. Does not keep the original file. If the dst has parent directories
    * that haven't been created, this method will create all the necessary parent directories.
-   * If both src and dst are files, dst will be overwritten.
-   * If src is a file and dst is a directory, src file will get moved under dst directory.
-   * If both src and dst are directories, src directory will get moved under dst directory.
-   * If src is a directory and dst is a file, operation will fail.
+   * Note: In Pinot we recommend the full paths of both src and dst be specified.
    * For example, if a file /a/b/c is moved to a file /x/y/z, in the case of overwrite, the directory /a/b still exists,
    * but will not contain the file 'c'. Instead, /x/y/z will contain the contents of 'c'.
-   * If a file /a is moved to a directory /x/y, all the original files under /x/y will be kept.
+   * If src is a directory /a/b which contains two files /a/b/c and /a/b/d, and the dst is /x/y, the result would be
+   * that the directory /a/b under /a gets removed and dst directory contains two files which is /x/y/c and /x/y/d.
+   * If src is a directory /a/b needs to be moved under another directory /x/y, please specify the dst to /x/y/b.
    * @param srcUri URI of the original file
    * @param dstUri URI of the final file location
    * @param overwrite true if we want to overwrite the dstURI, false otherwise
    * @return true if move is successful
    * @throws IOException on IO failure
    */
-  public abstract boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+      throws IOException {
+    if (!exists(srcUri)) {
+      LOGGER.warn("Source {} does not exist", srcUri);
+      return false;
+    }
+    if (exists(dstUri)) {
+      if (overwrite) {
+        delete(dstUri, true);
+      } else {
+        // dst file exists, returning
+        LOGGER.warn("Cannot move {} to {}. Destination exists and overwrite flag set to false.", srcUri, dstUri);
+        return false;
+      }
+    } else {
+      // ensures the parent path of dst exists.
+      URI parentUri = Paths.get(dstUri).getParent().toUri();
+      mkdir(parentUri);
+    }
+    return doMove(srcUri, dstUri);
+  }
+
+  /**
+   * Does the actual behavior of move in each FS.
+   */
+  protected abstract boolean doMove(URI srcUri, URI dstUri)
       throws IOException;
 
   /**
    * Copies the file or directory from the src to dst. The original file is retained. If the dst has parent directories
    * that haven't been created, this method will create all the necessary parent directories.
-   * If both src and dst are files, dst will be overwritten.
-   * If src is a file and dst is a directory, src file will get copied under dst directory.
-   * If both src and dst are directories, src directory will get copied under dst directory.
-   * If src is a directory and dst is a file, operation will fail.
-   * For example, if a file /x/y/z is copied to /a/b/c, /x/y/z will be retained and /x/y/z will also be present as /a/b/c;
-   * if a file /a is copied to a directory /x/y, all the original files under /x/y will be kept.
+   * Note: In Pinot we recommend the full paths of both src and dst be specified.
+   * For example, if a file /a/b/c is copied to a file /x/y/z, the directory /a/b still exists containing the file 'c'.
+   * The dst file /x/y/z will contain the contents of 'c'.
+   * If a directory /a/b is copied to another directory /x/y, the directory /x/y will contain the content of /a/b.
+   * If a directory /a/b is copied under the directory /x/y, the dst needs to be specify as /x/y/b.
    * @param srcUri URI of the original file
    * @param dstUri URI of the final file location
    * @return true if copy is successful
diff --git a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
index 0c0957c..5f1c595 100644
--- a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
+++ b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/LocalPinotFSTest.java
@@ -141,7 +141,7 @@ public class LocalPinotFSTest {
     Assert.assertEquals(_newTmpDir.listFiles().length, files);
     Assert.assertFalse(dstFile.exists());
 
-    // Check that a moving a file a non-existent destination folder will work
+    // Check that a moving a file to a non-existent destination folder will work
     FileUtils.deleteQuietly(_nonExistentTmpFolder);
     Assert.assertFalse(_nonExistentTmpFolder.exists());
     File srcFile = new File(_absoluteTmpDirPath, "srcFile");
@@ -153,7 +153,7 @@ public class LocalPinotFSTest {
     Assert.assertFalse(srcFile.exists());
     Assert.assertTrue(dstFile.exists());
 
-    //Check that moving a folder to a non-existent destination folder works
+    // Check that moving a folder to a non-existent destination folder works
     FileUtils.deleteQuietly(_nonExistentTmpFolder);
     Assert.assertFalse(_nonExistentTmpFolder.exists());
     srcFile = new File(_absoluteTmpDirPath, "srcFile");
@@ -210,6 +210,15 @@ public class LocalPinotFSTest {
     localPinotFS.copy(firstTempDir.toURI(), secondTempDir.toURI());
     Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), true).length, 1);
 
+    // Copying directory with files under another directory.
+    File firstTempDirUnderSecondTempDir = new File(secondTempDir, firstTempDir.getName());
+    localPinotFS.copy(firstTempDir.toURI(), firstTempDirUnderSecondTempDir.toURI());
+    Assert.assertTrue(localPinotFS.exists(firstTempDirUnderSecondTempDir.toURI()));
+    // There're two files/directories under secondTempDir.
+    Assert.assertEquals(localPinotFS.listFiles(secondTempDir.toURI(), false).length, 2);
+    // The file under src directory also got copied under dst directory.
+    Assert.assertEquals(localPinotFS.listFiles(firstTempDirUnderSecondTempDir.toURI(), true).length, 1);
+
     // len of dir = exception
     try {
       localPinotFS.length(firstTempDir.toURI());
diff --git a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
index 2da5766..803bcb8 100644
--- a/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
+++ b/pinot-filesystem/src/test/java/org/apache/pinot/filesystem/PinotFSFactoryTest.java
@@ -85,7 +85,7 @@ public class PinotFSFactoryTest {
     }
 
     @Override
-    public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+    public boolean doMove(URI srcUri, URI dstUri)
         throws IOException {
       return true;
     }
diff --git a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
index 1620b5a..8f6b13a 100644
--- a/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
+++ b/pinot-hadoop-filesystem/src/main/java/org/apache/pinot/filesystem/HadoopPinotFS.java
@@ -91,11 +91,8 @@ public class HadoopPinotFS extends PinotFS {
   }
 
   @Override
-  public boolean move(URI srcUri, URI dstUri, boolean overwrite)
+  protected boolean doMove(URI srcUri, URI dstUri)
       throws IOException {
-    if (exists(dstUri) && !overwrite) {
-      return false;
-    }
     return _hadoopFS.rename(new Path(srcUri), new Path(dstUri));
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org