You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/01/06 23:19:42 UTC

[impala] 01/02: IMPALA-10934 (Part 1): Enable table definition over a single file

This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7131ee34d25b2609c4eb0cc596bc1d99d6232acd
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Wed Nov 3 19:07:58 2021 +0100

    IMPALA-10934 (Part 1): Enable table definition over a single file
    
    Implements an abstraction layer to show files in a single directory.
    Impala side part - filesystem drivers are in HIVE-25569.
    
    Suppose that the filesystem has a directory in which there are multiple
    files:
     hdfs://somedir/f1.txt
     hdfs://somedir/f2.txt
    
    In case of a HMS backed table(s) - the contents of a directory could be
    considered as table.
    
    This patch enables a new file system wrapper 'sfs+' (sfs = single file
    system) which provides a view of a single file in a directory.'  The '+'
    indicates that this wrapper can be added on top of multiple underlying
    file systems/object storage such as HDFS, S3 etc. The directory which
    contains the file could be specified:
      sfs+hdfs://somedir/f1.txt/#SINGLEFILE#
    
    This will be a directory containing only the f1.txt and nothing else.
    
    This patch was tested locally - with a custom build of Hive version
    which also had HIVE-25569.
    
    Change-Id: I32be936243aa4c8320f5d06d2b7fbf98822f82e7
    Reviewed-on: http://gerrit.cloudera.org:8080/17878
    Reviewed-by: Aman Sinha <am...@cloudera.com>
    Tested-by: Aman Sinha <am...@cloudera.com>
---
 be/src/exec/hdfs-table-sink.cc                                | 1 +
 be/src/runtime/io/disk-io-mgr-test.cc                         | 4 +++-
 be/src/runtime/io/disk-io-mgr.cc                              | 8 ++++++++
 be/src/runtime/io/disk-io-mgr.h                               | 4 ++++
 be/src/util/hdfs-util.cc                                      | 5 +++++
 be/src/util/hdfs-util.h                                       | 3 +++
 fe/src/main/java/org/apache/impala/common/FileSystemUtil.java | 7 ++++++-
 java/shaded-deps/hive-exec/pom.xml                            | 2 ++
 8 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index b65e338..8a72d6c 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -461,6 +461,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
       IsADLSPath(tmp_hdfs_file_name_cstr) ||
       IsGcsPath(tmp_hdfs_file_name_cstr) ||
       IsCosPath(tmp_hdfs_file_name_cstr) ||
+      IsSFSPath(tmp_hdfs_file_name_cstr) ||
       IsOzonePath(tmp_hdfs_file_name_cstr)) {
     // On S3A, the file cannot be stat'ed until after it's closed, and even so, the block
     // size reported will be just the filesystem default. Similarly, the block size
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index f491f59..4dab7af 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -58,6 +58,7 @@ DECLARE_int32(num_ozone_io_threads);
 DECLARE_int32(num_oss_io_threads);
 DECLARE_int32(num_remote_hdfs_file_oper_io_threads);
 DECLARE_int32(num_s3_file_oper_io_threads);
+DECLARE_int32(num_sfs_io_threads);
 
 #ifndef NDEBUG
 DECLARE_int32(stress_disk_read_delay_ms);
@@ -1719,7 +1720,8 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
       + FLAGS_num_ozone_io_threads + FLAGS_num_oss_io_threads
       + FLAGS_num_remote_hdfs_file_oper_io_threads
       + FLAGS_num_s3_file_oper_io_threads + FLAGS_num_gcs_io_threads
-      + FLAGS_num_cos_io_threads;
+      + FLAGS_num_cos_io_threads
+      + FLAGS_num_sfs_io_threads;
 
   // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
   // Since we do not have control over which disk is used, we check for either type
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index aef83f9..22bead3 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -142,6 +142,9 @@ DEFINE_int32(num_cos_io_threads, 16, "Number of COS I/O threads");
 // The maximum number of Ozone I/O threads. TODO: choose the default empirically.
 DEFINE_int32(num_ozone_io_threads, 16, "Number of Ozone I/O threads");
 
+// The maximum number of SFS I/O threads.
+DEFINE_int32(num_sfs_io_threads, 16, "Number of SFS I/O threads");
+
 // The number of cached file handles defines how much memory can be used per backend for
 // caching frequently used file handles. Measurements indicate that a single file handle
 // uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory.
@@ -492,6 +495,9 @@ Status DiskIoMgr::Init() {
     } else if (i == RemoteS3DiskFileOperId()) {
       num_threads_per_disk = FLAGS_num_s3_file_oper_io_threads;
       device_name = "S3 remote file operations";
+    } else if (i == RemoteSFSDiskId()) {
+      num_threads_per_disk = FLAGS_num_sfs_io_threads;
+      device_name = "SFS remote";
     } else if (DiskInfo::is_rotational(i)) {
       num_threads_per_disk = num_io_threads_per_rotational_disk_;
       // During tests, i may not point to an existing disk.
@@ -825,6 +831,7 @@ int DiskIoMgr::AssignQueue(
     if (IsGcsPath(file, check_default_fs)) return RemoteGcsDiskId();
     if (IsCosPath(file, check_default_fs)) return RemoteCosDiskId();
     if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId();
+    if (IsSFSPath(file, check_default_fs)) return RemoteSFSDiskId();
   }
   // Assign to a local disk queue.
   DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote.
@@ -834,6 +841,7 @@ int DiskIoMgr::AssignQueue(
   DCHECK(!IsGcsPath(file, check_default_fs)); // GCS is always remote.
   DCHECK(!IsCosPath(file, check_default_fs)); // COS is always remote.
   DCHECK(!IsOzonePath(file, check_default_fs)); // Ozone is always remote.
+  DCHECK(!IsSFSPath(file, check_default_fs)); // SFS is always remote.
   if (disk_id == -1) {
     // disk id is unknown, assign it an arbitrary one.
     disk_id = next_disk_id_.Add(1);
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 9495673..95bf0e8 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -339,6 +339,9 @@ class DiskIoMgr : public CacheLineAligned {
   /// The disk ID (and therefore disk_queues_ index) used for Ozone accesses.
   int RemoteOzoneDiskId() const { return num_local_disks() + REMOTE_OZONE_DISK_OFFSET; }
 
+  /// The disk ID (and therefore disk_queues_ index) used for SFS accesses.
+  int RemoteSFSDiskId() const { return num_local_disks() + REMOTE_SFS_DISK_OFFSET; }
+
   /// Dumps the disk IoMgr queues (for readers and disks)
   std::string DebugString();
 
@@ -397,6 +400,7 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_OZONE_DISK_OFFSET,
     REMOTE_DFS_DISK_FILE_OPER_OFFSET,
     REMOTE_S3_DISK_FILE_OPER_OFFSET,
+    REMOTE_SFS_DISK_OFFSET,
     REMOTE_OSS_DISK_OFFSET,
     REMOTE_NUM_DISKS
   };
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 456dd8a..1d6db5f 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -38,6 +38,7 @@ const char* FILESYS_PREFIX_GCS = "gs://";
 const char* FILESYS_PREFIX_COS = "cosn://";
 const char* FILESYS_PREFIX_OZONE = "o3fs://";
 const char* FILESYS_PREFIX_OFS = "ofs://";
+const char* FILESYS_PREFIX_SFS = "sfs+";
 const char* FILESYS_PREFIX_OSS = "oss://";
 const char* FILESYS_PREFIX_JINDOFS = "jfs://";
 
@@ -133,6 +134,10 @@ bool IsOzonePath(const char* path, bool check_default_fs) {
       || IsSpecificPath(path, FILESYS_PREFIX_OFS, check_default_fs);
 }
 
+bool IsSFSPath(const char* path, bool check_default_fs) {
+  return IsSpecificPath(path, FILESYS_PREFIX_SFS, check_default_fs);
+}
+
 // Returns the length of the filesystem name in 'path' which is the length of the
 // 'scheme://authority'. Returns 0 if the path is unqualified.
 static int GetFilesystemNameLength(const char* path) {
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index 8de3fcc..a31a76d 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -80,6 +80,9 @@ bool IsCosPath(const char* path, bool check_default_fs = true);
 /// Returns true iff the path refers to a location on an Ozone filesystem.
 bool IsOzonePath(const char* path, bool check_default_fs = true);
 
+/// Returns true iff the path refers to a location on an SFS filesystem.
+bool IsSFSPath(const char* path, bool check_default_fs = true);
+
 /// Returns true iff 'pathA' and 'pathB' are on the same filesystem.
 bool FilesystemsMatch(const char* pathA, const char* pathB);
 
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 09a353a..e5439d6 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -69,6 +69,7 @@ public class FileSystemUtil {
   public static final String SCHEME_ALLUXIO = "alluxio";
   public static final String SCHEME_GCS = "gs";
   public static final String SCHEME_COS = "cosn";
+  public static final String SCHEME_SFS = "sfs";
 
   /**
    * Set containing all FileSystem scheme that known to supports storage UUIDs in
@@ -531,7 +532,8 @@ public class FileSystemUtil {
     OZONE,
     ALLUXIO,
     GCS,
-    COS;
+    COS,
+    SFS;
 
     private static final Map<String, FsType> SCHEME_TO_FS_MAPPING =
         ImmutableMap.<String, FsType>builder()
@@ -556,6 +558,9 @@ public class FileSystemUtil {
      * hdfs, s3a, etc.)
      */
     public static FsType getFsType(String scheme) {
+      if(scheme.startsWith("sfs+")) {
+        return SFS;
+      }
       return SCHEME_TO_FS_MAPPING.get(scheme);
     }
   }
diff --git a/java/shaded-deps/hive-exec/pom.xml b/java/shaded-deps/hive-exec/pom.xml
index 7fd6a72..58956c9 100644
--- a/java/shaded-deps/hive-exec/pom.xml
+++ b/java/shaded-deps/hive-exec/pom.xml
@@ -89,6 +89,8 @@ the same dependencies
             <filter>
               <artifact>org.apache.hive:hive-exec</artifact>
               <includes>
+                <include>META-INF/services/org.apache.hadoop.fs.FileSystem</include>
+                <include>org/apache/hadoop/hive/ql/io/SingleFileSystem*</include>
                 <include>org/apache/hadoop/hive/conf/**/*</include>
                 <include>org/apache/hadoop/hive/common/type/*</include>
                 <include>org/apache/hadoop/hive/common/FileUtils*</include>