You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by mi...@apache.org on 2023/03/02 17:47:00 UTC

[impala] 02/03: IMPALA-11920: Support spill to HDFS address by service ID

This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 99d676f8fb71304838c8fde70d3dd220f8f1f52a
Author: Michael Smith <mi...@cloudera.com>
AuthorDate: Mon Feb 13 11:09:56 2023 -0800

    IMPALA-11920: Support spill to HDFS address by service ID
    
    Allows addressing HDFS (and Ozone) filesystems in `scratch_dirs` by a
    service identifier that doesn't include a port number. Examples
    - "hdfs://hdfs1/:10G" uses the root directory of HDFS with a 10G limit
    - "ofs://ozone1/tmp::" uses /tmp in Ozone with default limit/priority
    
    Updates `scratch_dirs` parsing to allow whitespace after each specifier,
    as in "hdfs://hdfs1/ , /tmp". This is unambiguous and avoids failures
    for simple mistakes.
    
    Testing:
    - new backend test cases run with HDFS and Ozone
    - manually tested that Impala starts with
      --impalad_args=--scratch_dirs=ofs://localhost/tmp,/tmp
      creates impala-scratch in both locations
    
    Change-Id: Ie069cba211df85fe90d174900b20a26fcc1f7167
    Reviewed-on: http://gerrit.cloudera.org:8080/19496
    Reviewed-by: Michael Smith <mi...@cloudera.com>
    Tested-by: Michael Smith <mi...@cloudera.com>
---
 be/src/runtime/tmp-file-mgr-test.cc | 72 ++++++++++++++++++++++---------------
 be/src/runtime/tmp-file-mgr.cc      | 22 ++++++++----
 2 files changed, 59 insertions(+), 35 deletions(-)

diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index f29f12adb..69b80c975 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -1056,7 +1056,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
   // directories.
   auto& dirs2 = GetTmpDirs(
       CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:foo,/tmp/tmp-file-mgr-test2:?,"
-                       "/tmp/tmp-file-mgr-test3:1.2.3.4,/tmp/tmp-file-mgr-test4: ,"
+                       "/tmp/tmp-file-mgr-test3:1.2.3.4,/tmp/tmp-file-mgr-test4:a,"
                        "/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%,"
                        "/tmp/tmp-file-mgr-test1:100"));
   EXPECT_EQ(1, dirs2.size());
@@ -1111,10 +1111,10 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) {
     EXPECT_EQ(full_hdfs_path, dirs3->path());
     EXPECT_EQ(1024, dirs3->bytes_limit());
 
-    // Multiple local paths with one remote path.
+    // Multiple local paths with one remote path. Trims spaces.
     auto tmp_mgr_4 = CreateTmpFileMgr(hdfs_path
-        + ",/tmp/local-buffer-dir1,"
-          "/tmp/local-buffer-dir2,/tmp/local-buffer-dir3");
+        + " , /tmp/local-buffer-dir1, "
+          "/tmp/local-buffer-dir2 ,/tmp/local-buffer-dir3");
     auto& dirs4_local = GetTmpDirs(tmp_mgr_4);
     auto& dirs4_remote = GetTmpRemoteDir(tmp_mgr_4);
     EXPECT_NE(nullptr, dirs4_remote);
@@ -1123,8 +1123,8 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) {
     EXPECT_EQ("/tmp/local-buffer-dir2/impala-scratch", dirs4_local[0]->path());
     EXPECT_EQ("/tmp/local-buffer-dir3/impala-scratch", dirs4_local[1]->path());
 
-    // Fails the parsing due to no port number for the HDFS path.
-    auto tmp_mgr_5 = CreateTmpFileMgr("hdfs://localhost/tmp,/tmp/local-buffer-dir");
+    // Fails to parse HDFS URI due to no path element.
+    auto tmp_mgr_5 = CreateTmpFileMgr("hdfs://localhost,/tmp/local-buffer-dir");
     auto& dirs5_local = GetTmpDirs(tmp_mgr_5);
     auto& dirs5_remote = GetTmpRemoteDir(tmp_mgr_5);
     EXPECT_EQ(1, dirs5_local.size());
@@ -1136,33 +1136,27 @@ TEST_F(TmpFileMgrTest, TestDirectoryLimitParsingRemotePath) {
 
     // Parse successfully, but the parsed HDFS path is unable to connect.
     // These cases would fail the initialization of TmpFileMgr.
-    auto& dirs7 = GetTmpRemoteDir(
-        CreateTmpFileMgr("hdfs://localhost:1/tmp::1,/tmp/local-buffer-dir", false));
-    EXPECT_EQ(nullptr, dirs7);
-
-    auto& dirs8 = GetTmpRemoteDir(
-        CreateTmpFileMgr("hdfs://localhost:/tmp::,/tmp/local-buffer-dir", false));
-    EXPECT_EQ(nullptr, dirs8);
-
-    auto& dirs9 = GetTmpRemoteDir(
-        CreateTmpFileMgr("hdfs://localhost/tmp::1,/tmp/local-buffer-dir", false));
-    EXPECT_EQ(nullptr, dirs9);
-
-    auto& dirs10 = GetTmpRemoteDir(
-        CreateTmpFileMgr("hdfs://localhost/tmp:1,/tmp/local-buffer-dir", false));
-    EXPECT_EQ(nullptr, dirs10);
+    for (const string& unreachable_path : {
+      "hdfs://localhost:1/tmp::1", "hdfs://localhost:/tmp::", "hdfs://localhost/tmp::1",
+      "hdfs://localhost/tmp:1", "hdfs://localhost/tmp", "hdfs://localhost/:1",
+      "hdfs://localhost:1 ", "hdfs://localhost/ "
+    }) {
+      auto& dirs = GetTmpRemoteDir(
+          CreateTmpFileMgr(unreachable_path + ",/tmp/local-buffer-dir", false));
+      EXPECT_EQ(nullptr, dirs) << unreachable_path;
+    }
 
     // Multiple remote paths, should support only one.
-    auto& dirs11 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute(
+    auto& dirs6 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute(
         "$0,hdfs://localhost:20501/tmp,/tmp/local-buffer-dir", hdfs_path)));
-    EXPECT_NE(nullptr, dirs11);
-    EXPECT_EQ(full_hdfs_path, dirs11->path());
+    EXPECT_NE(nullptr, dirs6);
+    EXPECT_EQ(full_hdfs_path, dirs6->path());
 
     // The order of the buffer and the remote dir should not affect the result.
-    auto& dirs12 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute(
+    auto& dirs7 = GetTmpRemoteDir(CreateTmpFileMgr(Substitute(
         "/tmp/local-buffer-dir,$0,hdfs://localhost:20501/tmp", hdfs_path)));
-    EXPECT_NE(nullptr, dirs12);
-    EXPECT_EQ(full_hdfs_path, dirs12->path());
+    EXPECT_NE(nullptr, dirs7);
+    EXPECT_EQ(full_hdfs_path, dirs7->path());
   }
 
   // Successful cases for parsing S3 paths.
@@ -1385,7 +1379,7 @@ TEST_F(TmpFileMgrTest, TestDirectoryPriorityParsing) {
   // directories.
   auto& dirs2 = GetTmpDirs(
       CreateTmpFileMgr("/tmp/tmp-file-mgr-test1::foo,/tmp/tmp-file-mgr-test2::?,"
-                       "/tmp/tmp-file-mgr-test3::1.2.3.4,/tmp/tmp-file-mgr-test4:: ,"
+                       "/tmp/tmp-file-mgr-test3::1.2.3.4,/tmp/tmp-file-mgr-test4::a,"
                        "/tmp/tmp-file-mgr-test5::p0,/tmp/tmp-file-mgr-test6::10%,"
                        "/tmp/tmp-file-mgr-test1:100:-1"));
   EXPECT_EQ(1, dirs2.size());
@@ -2236,4 +2230,26 @@ TEST_F(TmpFileMgrTest, TestBatchReadingSetMaxBytes) {
   }
 }
 
+TEST_F(TmpFileMgrTest, TestHdfsScratchParsing) {
+  struct parsed { string path; int64_t bytes_limit; int priority; };
+  constexpr int64_t dbytes = numeric_limits<int64_t>::max();
+  constexpr int dpriority = numeric_limits<int>::max();
+  for (auto& valid : map<string, parsed>{
+    { "hdfs://10.0.0.1:8020", parsed{"hdfs://10.0.0.1:8020", dbytes, dpriority} },
+    { "hdfs://10.0.0.1:8020:1024:", parsed{"hdfs://10.0.0.1:8020", 1024, dpriority} },
+    { "hdfs://10.0.0.1/", parsed{"hdfs://10.0.0.1", dbytes, dpriority} },
+    { "hdfs://10.0.0.1/:1k", parsed{"hdfs://10.0.0.1", 1024, dpriority} },
+    { "hdfs://localhost/tmp::", parsed{"hdfs://localhost/tmp", dbytes, dpriority} },
+    { "hdfs://localhost/tmp/:10k:1", parsed{"hdfs://localhost/tmp", 10240, 1} },
+    { "ofs://ozone1/tmp:10k:1", parsed{"ofs://ozone1/tmp", 10240, 1} },
+    { "ofs://ozone1/tmp::1", parsed{"ofs://ozone1/tmp", dbytes, 1} },
+  }) {
+    TmpDirHdfs fs(valid.first);
+    EXPECT_OK(fs.Parse());
+    EXPECT_EQ(path(valid.second.path) / "impala-scratch", fs.path());
+    EXPECT_EQ(valid.second.bytes_limit, fs.bytes_limit());
+    EXPECT_EQ(valid.second.priority, fs.priority());
+  }
+}
+
 } // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index f8af71a30..ea871700c 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -287,7 +287,7 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
   // warning - we don't want to abort process startup because of misconfigured scratch,
   // since queries will generally still be runnable.
   for (const string& tmp_dir_spec : tmp_dir_specifiers) {
-    string tmp_dir_spec_trimmed(boost::algorithm::trim_left_copy(tmp_dir_spec));
+    string tmp_dir_spec_trimmed(boost::algorithm::trim_copy(tmp_dir_spec));
     std::unique_ptr<TmpDir> tmp_dir;
 
     if (IsHdfsPath(tmp_dir_spec_trimmed.c_str(), false)
@@ -811,15 +811,23 @@ Status TmpDirS3::VerifyAndCreate(MetricGroup* metrics, vector<bool>* is_tmp_dir_
 }
 
 Status TmpDirHdfs::ParsePathTokens(vector<string>& toks) {
-  // We enforce the HDFS scratch path to have the port number, and the format after split
-  // by colon is {scheme, path, port_num, [bytes_limit, [priority]]}. Coalesce the URI.
+  // HDFS scratch path can include an optional port number; URI without path and port
+  // number is ambiguous so in that case we error. Format after split by colon is
+  // {scheme, path, port_num?, [bytes_limit, [priority]]}. Coalesce the URI from tokens.
   split(toks, raw_path_, is_any_of(":"), token_compress_off);
-  if (toks.size() < 3) {
+  // Only called on paths starting with `hdfs://` or `ofs://`.
+  DCHECK(toks.size() >= 2);
+  if (toks[1].rfind("/") > 1) {
+    // Contains a slash after the scheme, so port number was omitted.
+    toks[0] = Substitute("$0:$1", toks[0], toks[1]);
+    toks.erase(toks.begin()+1);
+  } else if (toks.size() < 3) {
     return Status(
-        Substitute("The scratch path should have the port number: '$0'", raw_path_));
+        Substitute("The scratch URI must have a path or port number: '$0'", raw_path_));
+  } else {
+    toks[0] = Substitute("$0:$1:$2", toks[0], toks[1], toks[2]);
+    toks.erase(toks.begin()+1, toks.begin()+3);
   }
-  toks[0] = Substitute("$0:$1:$2", toks[0], toks[1], toks[2]);
-  toks.erase(toks.begin()+1, toks.begin()+3);
   return Status::OK();
 }