You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by jo...@apache.org on 2022/01/06 23:19:42 UTC
[impala] 01/02: IMPALA-10934 (Part 1): Enable table definition over a single file
This is an automated email from the ASF dual-hosted git repository.
joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7131ee34d25b2609c4eb0cc596bc1d99d6232acd
Author: Zoltan Haindrich <ki...@rxd.hu>
AuthorDate: Wed Nov 3 19:07:58 2021 +0100
IMPALA-10934 (Part 1): Enable table definition over a single file
Implements an abstraction layer to show files in a single directory.
Impala side part - filesystem drivers are in HIVE-25569.
Suppose that the filesystem has a directory in which there are multiple
files:
hdfs://somedir/f1.txt
hdfs://somedir/f2.txt
In case of a HMS backed table(s) - the contents of a directory could be
considered as table.
This patch enables a new file system wrapper 'sfs+' (sfs = single file
system) which provides a view of a single file in a directory.' The '+'
indicates that this wrapper can be added on top of multiple underlying
file systems/object storage such as HDFS, S3 etc. The directory which
contains the file could be specified:
sfs+hdfs://somedir/f1.txt/#SINGLEFILE#
This will be a directory containing only the f1.txt and nothing else.
This patch was tested locally - with a custom build of Hive version
which also had HIVE-25569.
Change-Id: I32be936243aa4c8320f5d06d2b7fbf98822f82e7
Reviewed-on: http://gerrit.cloudera.org:8080/17878
Reviewed-by: Aman Sinha <am...@cloudera.com>
Tested-by: Aman Sinha <am...@cloudera.com>
---
be/src/exec/hdfs-table-sink.cc | 1 +
be/src/runtime/io/disk-io-mgr-test.cc | 4 +++-
be/src/runtime/io/disk-io-mgr.cc | 8 ++++++++
be/src/runtime/io/disk-io-mgr.h | 4 ++++
be/src/util/hdfs-util.cc | 5 +++++
be/src/util/hdfs-util.h | 3 +++
fe/src/main/java/org/apache/impala/common/FileSystemUtil.java | 7 ++++++-
java/shaded-deps/hive-exec/pom.xml | 2 ++
8 files changed, 32 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index b65e338..8a72d6c 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -461,6 +461,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state,
IsADLSPath(tmp_hdfs_file_name_cstr) ||
IsGcsPath(tmp_hdfs_file_name_cstr) ||
IsCosPath(tmp_hdfs_file_name_cstr) ||
+ IsSFSPath(tmp_hdfs_file_name_cstr) ||
IsOzonePath(tmp_hdfs_file_name_cstr)) {
// On S3A, the file cannot be stat'ed until after it's closed, and even so, the block
// size reported will be just the filesystem default. Similarly, the block size
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index f491f59..4dab7af 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -58,6 +58,7 @@ DECLARE_int32(num_ozone_io_threads);
DECLARE_int32(num_oss_io_threads);
DECLARE_int32(num_remote_hdfs_file_oper_io_threads);
DECLARE_int32(num_s3_file_oper_io_threads);
+DECLARE_int32(num_sfs_io_threads);
#ifndef NDEBUG
DECLARE_int32(stress_disk_read_delay_ms);
@@ -1719,7 +1720,8 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
+ FLAGS_num_ozone_io_threads + FLAGS_num_oss_io_threads
+ FLAGS_num_remote_hdfs_file_oper_io_threads
+ FLAGS_num_s3_file_oper_io_threads + FLAGS_num_gcs_io_threads
- + FLAGS_num_cos_io_threads;
+ + FLAGS_num_cos_io_threads
+ + FLAGS_num_sfs_io_threads;
// Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
// Since we do not have control over which disk is used, we check for either type
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index aef83f9..22bead3 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -142,6 +142,9 @@ DEFINE_int32(num_cos_io_threads, 16, "Number of COS I/O threads");
// The maximum number of Ozone I/O threads. TODO: choose the default empirically.
DEFINE_int32(num_ozone_io_threads, 16, "Number of Ozone I/O threads");
+// The maximum number of SFS I/O threads.
+DEFINE_int32(num_sfs_io_threads, 16, "Number of SFS I/O threads");
+
// The number of cached file handles defines how much memory can be used per backend for
// caching frequently used file handles. Measurements indicate that a single file handle
// uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory.
@@ -492,6 +495,9 @@ Status DiskIoMgr::Init() {
} else if (i == RemoteS3DiskFileOperId()) {
num_threads_per_disk = FLAGS_num_s3_file_oper_io_threads;
device_name = "S3 remote file operations";
+ } else if (i == RemoteSFSDiskId()) {
+ num_threads_per_disk = FLAGS_num_sfs_io_threads;
+ device_name = "SFS remote";
} else if (DiskInfo::is_rotational(i)) {
num_threads_per_disk = num_io_threads_per_rotational_disk_;
// During tests, i may not point to an existing disk.
@@ -825,6 +831,7 @@ int DiskIoMgr::AssignQueue(
if (IsGcsPath(file, check_default_fs)) return RemoteGcsDiskId();
if (IsCosPath(file, check_default_fs)) return RemoteCosDiskId();
if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId();
+ if (IsSFSPath(file, check_default_fs)) return RemoteSFSDiskId();
}
// Assign to a local disk queue.
DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote.
@@ -834,6 +841,7 @@ int DiskIoMgr::AssignQueue(
DCHECK(!IsGcsPath(file, check_default_fs)); // GCS is always remote.
DCHECK(!IsCosPath(file, check_default_fs)); // COS is always remote.
DCHECK(!IsOzonePath(file, check_default_fs)); // Ozone is always remote.
+ DCHECK(!IsSFSPath(file, check_default_fs)); // SFS is always remote.
if (disk_id == -1) {
// disk id is unknown, assign it an arbitrary one.
disk_id = next_disk_id_.Add(1);
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 9495673..95bf0e8 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -339,6 +339,9 @@ class DiskIoMgr : public CacheLineAligned {
/// The disk ID (and therefore disk_queues_ index) used for Ozone accesses.
int RemoteOzoneDiskId() const { return num_local_disks() + REMOTE_OZONE_DISK_OFFSET; }
+ /// The disk ID (and therefore disk_queues_ index) used for SFS accesses.
+ int RemoteSFSDiskId() const { return num_local_disks() + REMOTE_SFS_DISK_OFFSET; }
+
/// Dumps the disk IoMgr queues (for readers and disks)
std::string DebugString();
@@ -397,6 +400,7 @@ class DiskIoMgr : public CacheLineAligned {
REMOTE_OZONE_DISK_OFFSET,
REMOTE_DFS_DISK_FILE_OPER_OFFSET,
REMOTE_S3_DISK_FILE_OPER_OFFSET,
+ REMOTE_SFS_DISK_OFFSET,
REMOTE_OSS_DISK_OFFSET,
REMOTE_NUM_DISKS
};
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 456dd8a..1d6db5f 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -38,6 +38,7 @@ const char* FILESYS_PREFIX_GCS = "gs://";
const char* FILESYS_PREFIX_COS = "cosn://";
const char* FILESYS_PREFIX_OZONE = "o3fs://";
const char* FILESYS_PREFIX_OFS = "ofs://";
+const char* FILESYS_PREFIX_SFS = "sfs+";
const char* FILESYS_PREFIX_OSS = "oss://";
const char* FILESYS_PREFIX_JINDOFS = "jfs://";
@@ -133,6 +134,10 @@ bool IsOzonePath(const char* path, bool check_default_fs) {
|| IsSpecificPath(path, FILESYS_PREFIX_OFS, check_default_fs);
}
+bool IsSFSPath(const char* path, bool check_default_fs) {
+ return IsSpecificPath(path, FILESYS_PREFIX_SFS, check_default_fs);
+}
+
// Returns the length of the filesystem name in 'path' which is the length of the
// 'scheme://authority'. Returns 0 if the path is unqualified.
static int GetFilesystemNameLength(const char* path) {
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index 8de3fcc..a31a76d 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -80,6 +80,9 @@ bool IsCosPath(const char* path, bool check_default_fs = true);
/// Returns true iff the path refers to a location on an Ozone filesystem.
bool IsOzonePath(const char* path, bool check_default_fs = true);
+/// Returns true iff the path refers to a location on an SFS filesystem.
+bool IsSFSPath(const char* path, bool check_default_fs = true);
+
/// Returns true iff 'pathA' and 'pathB' are on the same filesystem.
bool FilesystemsMatch(const char* pathA, const char* pathB);
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 09a353a..e5439d6 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -69,6 +69,7 @@ public class FileSystemUtil {
public static final String SCHEME_ALLUXIO = "alluxio";
public static final String SCHEME_GCS = "gs";
public static final String SCHEME_COS = "cosn";
+ public static final String SCHEME_SFS = "sfs";
/**
* Set containing all FileSystem scheme that known to supports storage UUIDs in
@@ -531,7 +532,8 @@ public class FileSystemUtil {
OZONE,
ALLUXIO,
GCS,
- COS;
+ COS,
+ SFS;
private static final Map<String, FsType> SCHEME_TO_FS_MAPPING =
ImmutableMap.<String, FsType>builder()
@@ -556,6 +558,9 @@ public class FileSystemUtil {
* hdfs, s3a, etc.)
*/
public static FsType getFsType(String scheme) {
+ if(scheme.startsWith("sfs+")) {
+ return SFS;
+ }
return SCHEME_TO_FS_MAPPING.get(scheme);
}
}
diff --git a/java/shaded-deps/hive-exec/pom.xml b/java/shaded-deps/hive-exec/pom.xml
index 7fd6a72..58956c9 100644
--- a/java/shaded-deps/hive-exec/pom.xml
+++ b/java/shaded-deps/hive-exec/pom.xml
@@ -89,6 +89,8 @@ the same dependencies
<filter>
<artifact>org.apache.hive:hive-exec</artifact>
<includes>
+ <include>META-INF/services/org.apache.hadoop.fs.FileSystem</include>
+ <include>org/apache/hadoop/hive/ql/io/SingleFileSystem*</include>
<include>org/apache/hadoop/hive/conf/**/*</include>
<include>org/apache/hadoop/hive/common/type/*</include>
<include>org/apache/hadoop/hive/common/FileUtils*</include>