You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bo...@apache.org on 2023/02/10 16:18:08 UTC

[impala] branch master updated (5abbb9bd1 -> e17fd9a0d)

This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 5abbb9bd1 IMPALA-11903: Ozone emits NONE when not erasure-coded
     new f4d306cbc IMPALA-11629: Support for huawei OBS FileSystem
     new e17fd9a0d IMPALA-11850 Adds HTTP tracing headers when using the hs2-http protocol.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/io/disk-io-mgr-test.cc              |   4 +-
 be/src/runtime/io/disk-io-mgr.cc                   |   8 +
 be/src/runtime/io/disk-io-mgr.h                    |   4 +
 be/src/transport/THttpServer.cpp                   |  22 +++
 be/src/transport/THttpServer.h                     |  17 +++
 be/src/util/hdfs-util.cc                           |   5 +
 be/src/util/hdfs-util.h                            |   3 +
 bin/impala-config.sh                               |  10 ++
 .../org/apache/impala/common/FileSystemUtil.java   |  15 +-
 java/executor-deps/pom.xml                         |  37 +++++
 java/pom.xml                                       |   1 +
 shell/ImpalaHttpClient.py                          |  29 ++++
 shell/impala_client.py                             |  39 ++++-
 shell/impala_shell.py                              |  15 +-
 shell/impala_shell_config_defaults.py              |   3 +-
 shell/option_parser.py                             |   5 +
 testdata/bin/create-load-data.sh                   |   7 +-
 .../common/etc/hadoop/conf/core-site.xml.py        |   9 ++
 tests/common/impala_test_suite.py                  |   6 +-
 tests/common/skip.py                               |  11 +-
 tests/common/test_dimensions.py                    |   4 +
 tests/custom_cluster/test_metastore_service.py     |  11 +-
 tests/custom_cluster/test_shell_commandline.py     | 169 +++++++++++++++++++++
 tests/util/filesystem_utils.py                     |   4 +-
 24 files changed, 412 insertions(+), 26 deletions(-)
 create mode 100644 tests/custom_cluster/test_shell_commandline.py


[impala] 01/02: IMPALA-11629: Support for huawei OBS FileSystem

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit f4d306cbca99968871e8bc23554da44bbcb4a37b
Author: yx91490 <yx...@126.com>
AuthorDate: Sat Oct 8 06:02:45 2022 +0000

    IMPALA-11629: Support for huawei OBS FileSystem
    
    This patch adds support for huawei OBS (Object Storage Service)
    FileSystem. The implementation is similar to other remote FileSystems.
    
    New flags for OBS:
    - num_obs_io_threads: Number of OBS I/O threads. Defaults to be 16.
    
    Testing:
     - Upload hdfs test data to an OBS bucket. Modify all locations in HMS
       DB to point to the OBS bucket. Remove some hdfs caching params.
       Run CORE tests.
    
    Change-Id: I84a54dbebcc5b71e9bcdd141dae9e95104d98cb1
    Reviewed-on: http://gerrit.cloudera.org:8080/19110
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/disk-io-mgr-test.cc              |  4 ++-
 be/src/runtime/io/disk-io-mgr.cc                   |  8 +++++
 be/src/runtime/io/disk-io-mgr.h                    |  4 +++
 be/src/util/hdfs-util.cc                           |  5 +++
 be/src/util/hdfs-util.h                            |  3 ++
 bin/impala-config.sh                               | 10 ++++++
 .../org/apache/impala/common/FileSystemUtil.java   | 15 ++++++++-
 java/executor-deps/pom.xml                         | 37 ++++++++++++++++++++++
 java/pom.xml                                       |  1 +
 testdata/bin/create-load-data.sh                   |  7 ++--
 .../common/etc/hadoop/conf/core-site.xml.py        |  9 ++++++
 tests/common/impala_test_suite.py                  |  6 +++-
 tests/common/skip.py                               | 11 ++++---
 tests/custom_cluster/test_metastore_service.py     | 11 +++----
 tests/util/filesystem_utils.py                     |  4 ++-
 15 files changed, 117 insertions(+), 18 deletions(-)

diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 8852762eb..f50fb7efa 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -59,6 +59,7 @@ DECLARE_int32(num_oss_io_threads);
 DECLARE_int32(num_remote_hdfs_file_oper_io_threads);
 DECLARE_int32(num_s3_file_oper_io_threads);
 DECLARE_int32(num_sfs_io_threads);
+DECLARE_int32(num_obs_io_threads);
 
 #ifndef NDEBUG
 DECLARE_int32(stress_disk_read_delay_ms);
@@ -1723,7 +1724,8 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
       + FLAGS_num_remote_hdfs_file_oper_io_threads
       + FLAGS_num_s3_file_oper_io_threads + FLAGS_num_gcs_io_threads
       + FLAGS_num_cos_io_threads
-      + FLAGS_num_sfs_io_threads;
+      + FLAGS_num_sfs_io_threads
+      + FLAGS_num_obs_io_threads;
 
   // Verify num_io_threads_per_rotational_disk and num_io_threads_per_solid_state_disk.
   // Since we do not have control over which disk is used, we check for either type
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 41a2e64d4..43b744f69 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -144,6 +144,9 @@ DEFINE_int32(num_ozone_io_threads, 16, "Number of Ozone I/O threads");
 // The maximum number of SFS I/O threads.
 DEFINE_int32(num_sfs_io_threads, 16, "Number of SFS I/O threads");
 
+// The maximum number of OBS I/O threads.
+DEFINE_int32(num_obs_io_threads, 16, "Number of OBS I/O threads");
+
 // The number of cached file handles defines how much memory can be used per backend for
 // caching frequently used file handles. Measurements indicate that a single file handle
 // uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory.
@@ -575,6 +578,9 @@ Status DiskIoMgr::Init() {
     } else if (i == RemoteSFSDiskId()) {
       num_threads_per_disk = FLAGS_num_sfs_io_threads;
       device_name = "SFS remote";
+    } else if (i == RemoteOBSDiskId()) {
+      num_threads_per_disk = FLAGS_num_obs_io_threads;
+      device_name = "OBS remote";
     } else if (DiskInfo::is_rotational(i)) {
       num_threads_per_disk = num_io_threads_per_rotational_disk_;
       // During tests, i may not point to an existing disk.
@@ -917,6 +923,7 @@ int DiskIoMgr::AssignQueue(
     if (IsCosPath(file, check_default_fs)) return RemoteCosDiskId();
     if (IsOzonePath(file, check_default_fs)) return RemoteOzoneDiskId();
     if (IsSFSPath(file, check_default_fs)) return RemoteSFSDiskId();
+    if (IsOBSPath(file, check_default_fs)) return RemoteOBSDiskId();
   }
   // Assign to a local disk queue.
   DCHECK(!IsS3APath(file, check_default_fs)); // S3 is always remote.
@@ -926,6 +933,7 @@ int DiskIoMgr::AssignQueue(
   DCHECK(!IsGcsPath(file, check_default_fs)); // GCS is always remote.
   DCHECK(!IsCosPath(file, check_default_fs)); // COS is always remote.
   DCHECK(!IsSFSPath(file, check_default_fs)); // SFS is always remote.
+  DCHECK(!IsOBSPath(file, check_default_fs)); // OBS is always remote.
   if (disk_id == -1) {
     // disk id is unknown, assign it an arbitrary one.
     disk_id = next_disk_id_.Add(1);
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index b58690889..7cf7662e3 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -342,6 +342,9 @@ class DiskIoMgr : public CacheLineAligned {
   /// The disk ID (and therefore disk_queues_ index) used for SFS accesses.
   int RemoteSFSDiskId() const { return num_local_disks() + REMOTE_SFS_DISK_OFFSET; }
 
+  /// The disk ID (and therefore disk_queues_ index) used for OBS accesses.
+  int RemoteOBSDiskId() const { return num_local_disks() + REMOTE_OBS_DISK_OFFSET; }
+
   /// Dumps the disk IoMgr queues (for readers and disks)
   std::string DebugString();
 
@@ -399,6 +402,7 @@ class DiskIoMgr : public CacheLineAligned {
     REMOTE_S3_DISK_FILE_OPER_OFFSET,
     REMOTE_SFS_DISK_OFFSET,
     REMOTE_OSS_DISK_OFFSET,
+    REMOTE_OBS_DISK_OFFSET,
     REMOTE_NUM_DISKS
   };
 
diff --git a/be/src/util/hdfs-util.cc b/be/src/util/hdfs-util.cc
index 46ab67af1..90f71470f 100644
--- a/be/src/util/hdfs-util.cc
+++ b/be/src/util/hdfs-util.cc
@@ -41,6 +41,7 @@ const char* FILESYS_PREFIX_OFS = "ofs://";
 const char* FILESYS_PREFIX_SFS = "sfs+";
 const char* FILESYS_PREFIX_OSS = "oss://";
 const char* FILESYS_PREFIX_JINDOFS = "jfs://";
+const char* FILESYS_PREFIX_OBS = "obs://";
 
 string GetHdfsErrorMsg(const string& prefix, const string& file) {
   string error_msg = GetStrErrMsg();
@@ -138,6 +139,10 @@ bool IsSFSPath(const char* path, bool check_default_fs) {
   return IsSpecificPath(path, FILESYS_PREFIX_SFS, check_default_fs);
 }
 
+bool IsOBSPath(const char* path, bool check_default_fs) {
+  return IsSpecificPath(path, FILESYS_PREFIX_OBS, check_default_fs);
+}
+
 // Returns the length of the filesystem name in 'path' which is the length of the
 // 'scheme://authority'. Returns 0 if the path is unqualified.
 static int GetFilesystemNameLength(const char* path) {
diff --git a/be/src/util/hdfs-util.h b/be/src/util/hdfs-util.h
index cb9629a50..b5d3cf607 100644
--- a/be/src/util/hdfs-util.h
+++ b/be/src/util/hdfs-util.h
@@ -84,6 +84,9 @@ bool IsOzonePath(const char* path, bool check_default_fs = true);
 /// Returns true iff the path refers to a location on an SFS filesystem.
 bool IsSFSPath(const char* path, bool check_default_fs = true);
 
+/// Returns true iff the path refers to a location on an OBS filesystem.
+bool IsOBSPath(const char* path, bool check_default_fs = true);
+
 /// Returns true iff 'pathA' and 'pathB' are on the same filesystem and bucket.
 /// Most filesystems embed bucket in the authority, but Ozone's ofs protocol allows
 /// addressing volume/bucket via the path and does not allow renames across them.
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index c954b15d2..1ac293676 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -250,6 +250,7 @@ export IMPALA_RELOAD4j_VERSION=1.2.22
 export IMPALA_SLF4J_VERSION=2.0.3
 export IMPALA_SPRINGFRAMEWORK_VERSION=5.3.20
 export IMPALA_XMLSEC_VERSION=2.2.3
+export IMPALA_OBS_VERSION=3.1.1-hw-42
 
 # When Impala is building docker images on Redhat-based distributions,
 # it is useful to be able to customize the base image. Some users will
@@ -699,6 +700,14 @@ elif [ "${TARGET_FILESYSTEM}" = "oss" ]; then
   fi
   DEFAULT_FS="oss://${OSS_BUCKET}"
   export DEFAULT_FS
+elif [ "${TARGET_FILESYSTEM}" = "obs" ]; then
+  # Basic error checking
+  OBS_ACCESS_KEY="${OBS_ACCESS_KEY:?OBS_ACCESS_KEY cannot be an empty string for OBS}"
+  OBS_SECRET_KEY="${OBS_SECRET_KEY:?OBS_SECRET_KEY cannot be an empty string for OBS}"
+  OBS_ENDPOINT="${OBS_ENDPOINT:?OBS_ENDPOINT cannot be an empty string for OBS}"
+  OBS_BUCKET="${OBS_BUCKET:?OBS_BUCKET cannot be an empty string for OBS}"
+  DEFAULT_FS="obs://${OBS_BUCKET}"
+  export OBS_ACCESS_KEY OBS_SECRET_KEY OBS_ENDPOINT DEFAULT_FS ENABLE_OBS_FILESYSTEM=true
 elif [ "${TARGET_FILESYSTEM}" = "isilon" ]; then
   if [ "${ISILON_NAMENODE}" = "" ]; then
     echo "In order to access the Isilon filesystem, ISILON_NAMENODE"
@@ -972,6 +981,7 @@ echo "IMPALA_RANGER_VERSION   = $IMPALA_RANGER_VERSION"
 echo "IMPALA_ICEBERG_VERSION  = $IMPALA_ICEBERG_VERSION"
 echo "IMPALA_COS_VERSION      = $IMPALA_COS_VERSION"
 echo "IMPALA_OSS_VERSION      = $IMPALA_OSS_VERSION"
+echo "IMPALA_OBS_VERSION      = $IMPALA_OBS_VERSION"
 
 # Kerberos things.  If the cluster exists and is kerberized, source
 # the required environment.  This is required for any hadoop tool to
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index e50fb85ba..bd88f4a98 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -84,6 +84,7 @@ public class FileSystemUtil {
   public static final String SCHEME_COS = "cosn";
   public static final String SCHEME_OSS = "oss";
   public static final String SCHEME_SFS = "sfs";
+  public static final String SCHEME_OBS = "obs";
 
   public static final String NO_ERASURE_CODE_LABEL = "NONE";
 
@@ -115,6 +116,7 @@ public class FileSystemUtil {
           .add(SCHEME_GCS)
           .add(SCHEME_COS)
           .add(SCHEME_OSS)
+          .add(SCHEME_OBS)
           .build();
 
   /**
@@ -131,6 +133,7 @@ public class FileSystemUtil {
           .add(SCHEME_GCS)
           .add(SCHEME_COS)
           .add(SCHEME_OSS)
+          .add(SCHEME_OBS)
           .build();
 
   /**
@@ -148,6 +151,7 @@ public class FileSystemUtil {
           .add(SCHEME_GCS)
           .add(SCHEME_COS)
           .add(SCHEME_OSS)
+          .add(SCHEME_OBS)
           .build();
 
   /**
@@ -565,6 +569,13 @@ public class FileSystemUtil {
     return hasScheme(fs, SCHEME_OSS);
   }
 
+  /**
+   * Returns true iff the filesystem is a OBSFileSystem.
+   */
+  public static boolean isOBSFileSystem(FileSystem fs) {
+    return hasScheme(fs, SCHEME_OBS);
+  }
+
   /**
    * Returns true iff the filesystem is AdlFileSystem.
    */
@@ -676,7 +687,8 @@ public class FileSystemUtil {
     GCS,
     COS,
     OSS,
-    SFS;
+    SFS,
+    OBS;
 
     private static final Map<String, FsType> SCHEME_TO_FS_MAPPING =
         ImmutableMap.<String, FsType>builder()
@@ -692,6 +704,7 @@ public class FileSystemUtil {
             .put(SCHEME_GCS, GCS)
             .put(SCHEME_COS, COS)
             .put(SCHEME_OSS, OSS)
+            .put(SCHEME_OBS, OBS)
             .build();
 
     /**
diff --git a/java/executor-deps/pom.xml b/java/executor-deps/pom.xml
index 428b78145..0847462e1 100644
--- a/java/executor-deps/pom.xml
+++ b/java/executor-deps/pom.xml
@@ -205,4 +205,41 @@ under the License.
       </plugin>
     </plugins>
   </build>
+
+  <profiles>
+    <profile>
+      <!--
+        This profile aims to include huawei OBS dependencies.
+        This profile is disabled by default.
+        To active this profile, set environment variable 'ENABLE_OBS_FILESYSTEM' to 'true',
+        or use 'mvn package -Pobs-filesystem'.
+      -->
+      <id>obs-filesystem</id>
+      <activation>
+        <property>
+          <name>env.ENABLE_OBS_FILESYSTEM</name>
+          <value>true</value>
+        </property>
+      </activation>
+      <dependencies>
+        <!-- HuaweiCloud SDK. -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-huaweicloud</artifactId>
+            <version>${obs.version}</version>
+        </dependency>
+      </dependencies>
+      <repositories>
+        <!-- HuaweiCloud SDK repository. -->
+        <repository>
+          <id>huaweicloud.repo</id>
+          <url>https://repo.huaweicloud.com/repository/maven/huaweicloudsdk</url>
+          <name>HuaweiCloud SDK Repository</name>
+          <snapshots>
+            <enabled>false</enabled>
+          </snapshots>
+        </repository>
+      </repositories>
+    </profile>
+  </profiles>
 </project>
diff --git a/java/pom.xml b/java/pom.xml
index c84571551..76364f06d 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -52,6 +52,7 @@ under the License.
     <knox.version>${env.IMPALA_KNOX_VERSION}</knox.version>
     <cos.version>${env.IMPALA_COS_VERSION}</cos.version>
     <oss.version>${env.IMPALA_OSS_VERSION}</oss.version>
+    <obs.version>${env.IMPALA_OBS_VERSION}</obs.version>
     <thrift.version>${env.IMPALA_THRIFT_POM_VERSION}</thrift.version>
     <impala.extdatasrc.api.version>${project.version}</impala.extdatasrc.api.version>
     <impala.query.event.hook.api.version>${project.version}</impala.query.event.hook.api.version>
diff --git a/testdata/bin/create-load-data.sh b/testdata/bin/create-load-data.sh
index 8b5f67859..3c8f6fc51 100755
--- a/testdata/bin/create-load-data.sh
+++ b/testdata/bin/create-load-data.sh
@@ -119,8 +119,8 @@ fi
 TIMEOUT_PID=$!
 
 SCHEMA_MISMATCH_ERROR="A schema change has been detected in the metadata, "
-SCHEMA_MISMATCH_ERROR+="but it cannot be loaded on Isilon, s3, gcs, cos, oss or local "
-SCHEMA_MISMATCH_ERROR+="filesystem, and the filesystem is ${TARGET_FILESYSTEM}".
+SCHEMA_MISMATCH_ERROR+="but it cannot be loaded on Isilon, s3, gcs, cos, oss, obs or "
+SCHEMA_MISMATCH_ERROR+="local filesystem, and the filesystem is ${TARGET_FILESYSTEM}".
 
 if [[ $SKIP_METADATA_LOAD -eq 0  && "$SNAPSHOT_FILE" = "" ]]; then
   run-step "Generating HBase data" create-hbase.log \
@@ -135,7 +135,8 @@ elif [ $SKIP_SNAPSHOT_LOAD -eq 0 ]; then
   if ! ${IMPALA_HOME}/testdata/bin/check-schema-diff.sh; then
     if [[ "${TARGET_FILESYSTEM}" == "isilon" || "${TARGET_FILESYSTEM}" == "s3" || \
           "${TARGET_FILESYSTEM}" == "local" || "${TARGET_FILESYSTEM}" == "gs" || \
-          "${TARGET_FILESYSTEM}" == "cosn" || "${TARGET_FILESYSTEM}" == "oss" ]] ; then
+          "${TARGET_FILESYSTEM}" == "cosn" || "${TARGET_FILESYSTEM}" == "oss" || \
+          "${TARGET_FILESYSTEM}" == "obs" ]] ; then
       echo "ERROR in $0 at line $LINENO: A schema change has been detected in the"
       echo "metadata, but it cannot be loaded on isilon, s3, gcs, cos, oss or local"
       echo "and the target file system is ${TARGET_FILESYSTEM}.  Exiting."
diff --git a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
index bcb17edfa..0ebc97372 100644
--- a/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
+++ b/testdata/cluster/node_templates/common/etc/hadoop/conf/core-site.xml.py
@@ -138,6 +138,15 @@ if target_filesystem == 's3':
       'fs.s3a.s3guard.ddb.region': '${S3GUARD_DYNAMODB_REGION}',
     })
 
+if target_filesystem == 'obs':
+  CONFIG.update({
+    'fs.obs.impl': 'org.apache.hadoop.fs.obs.OBSFileSystem',
+    'fs.AbstractFileSystem.obs.impl': 'org.apache.hadoop.fs.obs.OBS',
+    'fs.obs.access.key': '${OBS_ACCESS_KEY}',
+    'fs.obs.secret.key': '${OBS_SECRET_KEY}',
+    'fs.obs.endpoint': '${OBS_ENDPOINT}',
+    })
+
 if target_filesystem == 'ozone':
   CONFIG.update({'fs.ofs.impl': 'org.apache.hadoop.fs.ozone.RootedOzoneFileSystem'})
 
diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py
index b88b629b2..0a605a0d7 100644
--- a/tests/common/impala_test_suite.py
+++ b/tests/common/impala_test_suite.py
@@ -70,6 +70,7 @@ from tests.util.filesystem_utils import (
     IS_GCS,
     IS_COS,
     IS_OSS,
+    IS_OBS,
     IS_HDFS,
     S3_BUCKET_NAME,
     S3GUARD_ENABLED,
@@ -275,6 +276,9 @@ class ImpalaTestSuite(BaseTestSuite):
     elif IS_OSS:
       # OSS is implemented via HDFS command line client
       cls.filesystem_client = HadoopFsCommandLineClient("OSS")
+    elif IS_OBS:
+      # OBS is implemented via HDFS command line client
+      cls.filesystem_client = HadoopFsCommandLineClient("OBS")
     elif IS_OZONE:
       cls.filesystem_client = HadoopFsCommandLineClient("Ozone")
 
@@ -1067,7 +1071,7 @@ class ImpalaTestSuite(BaseTestSuite):
     # If 'skip_hbase' is specified or the filesystem is isilon, s3, GCS(gs), COS(cosn) or
     # local, we don't need the hbase dimension.
     if pytest.config.option.skip_hbase or TARGET_FILESYSTEM.lower() \
-        in ['s3', 'isilon', 'local', 'abfs', 'adls', 'gs', 'cosn', 'ozone']:
+        in ['s3', 'isilon', 'local', 'abfs', 'adls', 'gs', 'cosn', 'ozone', 'obs']:
       for tf_dimension in tf_dimensions:
         if tf_dimension.value.file_format == "hbase":
           tf_dimensions.remove(tf_dimension)
diff --git a/tests/common/skip.py b/tests/common/skip.py
index 863ed0a26..b908ea9e8 100644
--- a/tests/common/skip.py
+++ b/tests/common/skip.py
@@ -34,6 +34,7 @@ from tests.util.filesystem_utils import (
     IS_GCS,
     IS_COS,
     IS_OSS,
+    IS_OBS,
     IS_EC,
     IS_HDFS,
     IS_ISILON,
@@ -62,9 +63,9 @@ class SkipIfFS:
       reason="Empty directories are not supported on S3")
   file_or_folder_name_ends_with_period = pytest.mark.skipif(IS_ABFS,
       reason="ABFS does not support file / directories that end with a period")
-  stress_insert_timeouts = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS,
+  stress_insert_timeouts = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS or IS_OBS,
       reason="IMPALA-10563, IMPALA-10773")
-  shutdown_idle_fails = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS,
+  shutdown_idle_fails = pytest.mark.skipif(IS_COS or IS_GCS or IS_OSS or IS_OBS,
       reason="IMPALA-10562")
   late_filters = pytest.mark.skipif(IS_ISILON, reason="IMPALA-6998")
   read_past_eof = pytest.mark.skipif(IS_S3 or IS_GCS or (IS_OZONE and IS_EC),
@@ -81,9 +82,9 @@ class SkipIfFS:
       reason="Tests rely on HDFS qualified paths, IMPALA-1872")
   no_partial_listing = pytest.mark.skipif(not IS_HDFS,
       reason="Tests rely on HDFS partial listing.")
-  variable_listing_times = pytest.mark.skipif(IS_S3 or IS_GCS or IS_COS or IS_OSS,
-      reason="Flakiness due to unpredictable listing times on S3.")
-  eventually_consistent = pytest.mark.skipif(IS_ADLS or IS_COS or IS_OSS,
+  variable_listing_times = pytest.mark.skipif(IS_S3 or IS_GCS or IS_COS or IS_OSS
+      or IS_OBS, reason="Flakiness due to unpredictable listing times on S3.")
+  eventually_consistent = pytest.mark.skipif(IS_ADLS or IS_COS or IS_OSS or IS_OBS,
       reason="The client is slow to realize changes to file metadata")
 
 class SkipIfKudu:
diff --git a/tests/custom_cluster/test_metastore_service.py b/tests/custom_cluster/test_metastore_service.py
index 05c7a0895..62a621a73 100644
--- a/tests/custom_cluster/test_metastore_service.py
+++ b/tests/custom_cluster/test_metastore_service.py
@@ -29,8 +29,7 @@ from hive_metastore.ttypes import SerDeInfo
 from tests.util.event_processor_utils import EventProcessorUtils
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.util.filesystem_utils import (IS_S3, IS_ADLS, IS_GCS, IS_COS, IS_OSS)
-
+from tests.util.filesystem_utils import IS_HDFS, IS_OZONE
 
 class TestMetastoreService(CustomClusterTestSuite):
     """
@@ -1210,9 +1209,9 @@ class TestMetastoreService(CustomClusterTestSuite):
       test_part_names = list(part_names)
       if expect_files:
         assert get_parts_by_names_result.dictionary is not None
-        # obj_dict will only be populated when the table is on HDFS
+        # obj_dict will only be populated when the table is on HDFS or OZONE
         # where block locations are available.
-        if not IS_S3 and not IS_GCS and not IS_COS and not IS_ADLS and not IS_OSS:
+        if IS_HDFS or IS_OZONE:
           assert len(get_parts_by_names_result.dictionary.values) > 0
       else:
         assert get_parts_by_names_result.dictionary is None
@@ -1238,9 +1237,9 @@ class TestMetastoreService(CustomClusterTestSuite):
       assert filemetadata is not None
       assert filemetadata.data is not None
       assert obj_dict is not None
-      # obj_dict will only be populated when the table is on HDFS
+      # obj_dict will only be populated when the table is on HDFS or OZONE
       # where block locations are available.
-      if not IS_S3 and not IS_GCS and not IS_COS and not IS_ADLS and not IS_OSS:
+      if IS_HDFS or IS_OZONE:
         assert len(obj_dict.values) > 0
 
     def __assert_no_filemd(self, filemetadata, obj_dict):
diff --git a/tests/util/filesystem_utils.py b/tests/util/filesystem_utils.py
index 7444db7e8..fe13a807e 100644
--- a/tests/util/filesystem_utils.py
+++ b/tests/util/filesystem_utils.py
@@ -34,6 +34,7 @@ IS_ABFS = FILESYSTEM == "abfs"
 IS_GCS = FILESYSTEM == "gs"
 IS_COS = FILESYSTEM == "cosn"
 IS_OSS = FILESYSTEM == "oss"
+IS_OBS = FILESYSTEM == "obs"
 IS_OZONE = FILESYSTEM == "ozone"
 IS_EC = os.getenv("ERASURE_CODING") == "true"
 IS_ENCRYPTED = os.getenv("USE_OZONE_ENCRYPTION") == "true"
@@ -61,7 +62,8 @@ ADLS_CLIENT_SECRET = os.getenv("azure_client_secret")
 
 # A map of FILESYSTEM values to their corresponding Scan Node types
 fs_to_name = {'s3': 'S3', 'hdfs': 'HDFS', 'local': 'LOCAL', 'adls': 'ADLS',
-              'abfs': 'ADLS', 'gs': 'GCS', 'cosn': 'COS', 'ozone': 'OZONE', 'oss': 'OSS'}
+              'abfs': 'ADLS', 'gs': 'GCS', 'cosn': 'COS', 'ozone': 'OZONE',
+              'oss': 'OSS', 'obs': 'OBS'}
 
 
 def get_fs_name(fs):


[impala] 02/02: IMPALA-11850 Adds HTTP tracing headers when using the hs2-http protocol.

Posted by bo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

boroknagyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e17fd9a0d5428306dfa41a041a44c800824d72f6
Author: jasonmfehr <jf...@cloudera.com>
AuthorDate: Wed Dec 14 00:37:12 2022 +0000

    IMPALA-11850 Adds HTTP tracing headers when using the hs2-http protocol.
    
    When using the hs2 protocol with the http transport, include several
    tracing http headers by default.  These headers are:
    
      * X-Request-Id        -- client defined string that identifies the
                               http request, this string is meaningful only
                               to the client
      * X-Impala-Session-Id -- session id generated by the Impala backend,
                               will be omitted on http calls that occur
                               before this id has been generated
      * X-Impala-Query-Id   -- query id generated by the Impala backend,
                               will be omitted on http calls that occur
                               before this id has been generated
    
    The Impala shell includes these headers by default.  The command
    line argument --no_http_tracing has been added to remove these
    headers.
    
    The Impala backend logs out these headers if they are on the http
    request.  The log messages are written out at log level 2 (RPC).
    
    Testing:
      - manual testing (verified using debugging proxy and impala logs)
      - new python test
    
    Change-Id: I7857eb5ec03eba32e06ec8d4133480f2e958ad2f
    Reviewed-on: http://gerrit.cloudera.org:8080/19428
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/transport/THttpServer.cpp               |  22 ++++
 be/src/transport/THttpServer.h                 |  17 +++
 shell/ImpalaHttpClient.py                      |  29 +++++
 shell/impala_client.py                         |  39 +++++-
 shell/impala_shell.py                          |  15 ++-
 shell/impala_shell_config_defaults.py          |   3 +-
 shell/option_parser.py                         |   5 +
 tests/common/test_dimensions.py                |   4 +
 tests/custom_cluster/test_shell_commandline.py | 169 +++++++++++++++++++++++++
 9 files changed, 295 insertions(+), 8 deletions(-)

diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index 8189d16c2..8498ac1f2 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -33,6 +33,7 @@
 
 #include "gen-cpp/Frontend_types.h"
 #include "util/metrics.h"
+#include "common/logging.h"
 
 DECLARE_bool(trusted_domain_use_xff_header);
 DECLARE_bool(saml2_ee_test_mode);
@@ -125,6 +126,10 @@ THttpServer::~THttpServer() {
   #define THRIFT_strcasestr(haystack, needle) strcasestr(haystack, needle)
 #endif
 
+const std::string THttpServer::HEADER_REQUEST_ID = "X-Request-Id";
+const std::string THttpServer::HEADER_IMPALA_SESSION_ID = "X-Impala-Session-Id";
+const std::string THttpServer::HEADER_IMPALA_QUERY_ID = "X-Impala-Query-Id";
+
 void THttpServer::parseHeader(char* header) {
   char* colon = strchr(header, ':');
   if (colon == NULL) {
@@ -169,6 +174,15 @@ void THttpServer::parseHeader(char* header) {
   } else if (check_trusted_auth_header_
       && THRIFT_strncasecmp(header, FLAGS_trusted_auth_header.c_str(), sz) == 0) {
     found_trusted_auth_header_ = true;
+  } else if (THRIFT_strncasecmp(header, HEADER_REQUEST_ID.c_str(), sz) == 0) {
+    header_x_request_id_ = string(value);
+    StripWhiteSpace(&header_x_request_id_);
+  } else if (THRIFT_strncasecmp(header, HEADER_IMPALA_SESSION_ID.c_str(), sz) == 0) {
+    header_x_session_id_ = string(value);
+    StripWhiteSpace(&header_x_session_id_);
+  } else if (THRIFT_strncasecmp(header, HEADER_IMPALA_QUERY_ID.c_str(), sz) == 0) {
+    header_x_query_id_ = string(value);
+    StripWhiteSpace(&header_x_query_id_);
   }
 }
 
@@ -229,6 +243,14 @@ bool THttpServer::parseStatusLine(char* status) {
 }
 
 void THttpServer::headersDone() {
+  if (!header_x_request_id_.empty() || !header_x_session_id_.empty() ||
+      !header_x_query_id_.empty()) {
+    VLOG_RPC << "HTTP Connection Tracing Headers"
+        << (header_x_request_id_.empty() ? "" : " x-request-id=" + header_x_request_id_)
+        << (header_x_session_id_.empty() ? "" : " x-session-id=" + header_x_session_id_)
+        << (header_x_query_id_.empty() ? "" : " x-query-id=" + header_x_query_id_);
+  }
+
   if (!has_ldap_ && !has_kerberos_ && !has_saml_ && !has_jwt_) {
     // We don't need to authenticate.
     resetAuthState();
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index f37620db3..b623e6491 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -152,6 +152,14 @@ public:
   void setCallbacks(const HttpCallbacks& callbacks) { callbacks_ = callbacks; }
 
 protected:
+  // Names of HTTP headers that are meaningful.
+  // Client-defined string identifying the HTTP request, meaningful only to the client.
+  static const std::string HEADER_REQUEST_ID;
+  // Impala session id specified by the Impala backend.  Used for tracing HTTP requests.
+  static const std::string HEADER_IMPALA_SESSION_ID;
+  // Impala query id specified by the Impala backend.  Used for tracing HTTP requests.
+  static const std::string HEADER_IMPALA_QUERY_ID;
+
   void readHeaders();
   virtual void parseHeader(char* header);
   virtual bool parseStatusLine(char* status);
@@ -223,6 +231,15 @@ protected:
   // Used to collect all information about the http request. Can be passed to the
   // Frontend. Currently only used by SAML SSO.
   impala::TWrappedHttpRequest* wrapped_request_ = nullptr;
+
+  // The value from the 'X-Request-Id' header.
+  std::string header_x_request_id_ = "";
+
+  // The value from the 'X-Impala-Session-Id' header.
+  std::string header_x_session_id_ = "";
+
+  // The value from the 'X-Impala-Query-Id' header.
+  std::string header_x_query_id_ = "";
 };
 
 /**
diff --git a/shell/ImpalaHttpClient.py b/shell/ImpalaHttpClient.py
index 5d7de079c..88cc0f054 100644
--- a/shell/ImpalaHttpClient.py
+++ b/shell/ImpalaHttpClient.py
@@ -133,6 +133,7 @@ class ImpalaHttpClient(TTransportBase):
     self.__get_custom_headers_func = None
     self.__basic_auth = None
     self.__kerb_service = None
+    self.__add_custom_headers_funcs = []
 
   @staticmethod
   def basic_proxy_auth_header(proxy):
@@ -227,13 +228,41 @@ class ImpalaHttpClient(TTransportBase):
     # auth mechanism: None
     self.__get_custom_headers_func = self.getCustomHeadersWithoutAuth
 
+  # Whenever http(s) calls are made to the backend impala, each function
+  # added through this method will be called.  Thus, arbitrary custom
+  # headers can be set on each request.
+  # parameters:
+  #  funcs - tuple of functions where each takes no arguments and returns
+  #      a dict of http headers
+  # Note:  if the custom function returns a http header with a name that
+  # does not start with "X-" or "x-", it will cause an error to be thrown
+  def addCustomHeaderFunc(self, *funcs):
+    if funcs is None:
+      return
+
+    for f in funcs:
+      self.__add_custom_headers_funcs.append(f)
+
   # Update HTTP headers based on the saved cookies and auth mechanism.
   def refreshCustomHeaders(self):
+    self.__custom_headers = {}
+
     if self.__get_custom_headers_func:
       cookie_header, has_auth_cookie = self.getHttpCookieHeaderForRequest()
       self.__custom_headers = \
           self.__get_custom_headers_func(cookie_header, has_auth_cookie)
 
+    for f in self.__add_custom_headers_funcs:
+      headers = f()
+      if headers is not None:
+        for key in headers:
+          assert key[0:2].lower() == "x-", \
+            "header '{0}' is not valid, all custom headers must start with "\
+            "'X-' or 'x-'".format(key)
+          assert key not in self.__custom_headers, \
+            "header '{0}' already exists in custom headers dictionary".format(key)
+          self.__custom_headers[key] = headers[key]
+
   # Return first value as a cookie list for Cookie header. It's a list of name-value
   # pairs in the form of <cookie-name>=<cookie-value>. Pairs in the list are separated by
   # a semicolon and a space ('; ').
diff --git a/shell/impala_client.py b/shell/impala_client.py
index e2a058a47..af6739cd0 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -30,6 +30,7 @@ import ssl
 import sys
 import time
 from datetime import datetime
+import uuid
 
 from beeswaxd import BeeswaxService
 from beeswaxd.BeeswaxService import QueryState
@@ -133,7 +134,7 @@ class ImpalaClient(object):
                ldap_password=None, use_ldap=False, client_connect_timeout_ms=60000,
                verbose=True, use_http_base_transport=False, http_path=None,
                http_cookie_names=None, http_socket_timeout_s=None, value_converter=None,
-               connect_max_tries=4, rpc_stdout=False, rpc_file=None):
+               connect_max_tries=4, rpc_stdout=False, rpc_file=None, http_tracing=True):
     self.connected = False
     self.impalad_host = impalad[0]
     self.impalad_port = int(impalad[1])
@@ -155,6 +156,7 @@ class ImpalaClient(object):
     self.use_http_base_transport = use_http_base_transport
     self.http_path = http_path
     self.http_cookie_names = http_cookie_names
+    self.http_tracing = http_tracing
     # This is set from ImpalaShell's signal handler when a query is cancelled
     # from command line via CTRL+C. It is used to suppress error messages of
     # query cancellation.
@@ -429,6 +431,8 @@ class ImpalaClient(object):
     else:
       transport.setNoneAuth()
 
+    transport.addCustomHeaderFunc(self.get_custom_http_headers)
+
     # Without buffering Thrift would call socket.recv() each time it deserializes
     # something (e.g. a member in a struct).
     transport = TBufferedTransport(transport)
@@ -643,6 +647,12 @@ class ImpalaClient(object):
     if not self.connected:
       raise DisconnectedException("Not connected (use CONNECT to establish a connection)")
 
+  def get_custom_http_headers(self):
+    # When the transport is http, subclasses can override this function
+    # to add arbitrary http headers.
+    return None
+
+
 class ImpalaHS2Client(ImpalaClient):
   """Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
   def __init__(self, *args, **kwargs):
@@ -672,6 +682,9 @@ class ImpalaHS2Client(ImpalaClient):
     if self.rpc_stdout or self.rpc_stdout is not None:
       self.thrift_printer = ThriftPrettyPrinter()
 
+    self._base_request_id = str(uuid.uuid1())
+    self._request_num = 0
+
   def _get_thrift_client(self, protocol):
     return ImpalaHiveServer2Service.Client(protocol)
 
@@ -697,6 +710,25 @@ class ImpalaHS2Client(ImpalaClient):
 
     self._populate_query_options()
 
+  def get_custom_http_headers(self):
+    headers = {}
+
+    if self.http_tracing:
+      session_id = self.get_session_id()
+      if session_id is not None:
+        headers["X-Impala-Session-Id"] = session_id
+
+      current_query_id = self.get_query_id_str(self._current_query_handle)
+      if current_query_id is not None:
+        headers["X-Impala-Query-Id"] = current_query_id
+
+      assert getattr(self, "_current_request_id", None) is not None, \
+        "request id was not set"
+      headers["X-Request-Id"] = self._current_request_id
+
+    return headers
+
+
   def close_connection(self):
     if self.session_handle is not None:
       # Attempt to close session explicitly. Do not fail if there is an error
@@ -813,7 +845,6 @@ class ImpalaHS2Client(ImpalaClient):
         # Attach the schema to the handle for convenience.
         handle.schema = resp.schema
       handle.is_closed = False
-      self._clear_current_query_handle()
       return handle
     finally:
       self._clear_current_query_handle()
@@ -1075,6 +1106,10 @@ class ImpalaHS2Client(ImpalaClient):
     If 'retry_on_error' is true, the rpc is retried if an exception is raised. The maximum
     number of tries is determined by 'self.max_tries'. Retries, if enabled, are attempted
     for all exceptions other than TApplicationException."""
+
+    self._request_num += 1
+    self._current_request_id = "{0}-{1}".format(self._base_request_id, self._request_num)
+
     self._check_connected()
     num_tries = 1
     max_tries = num_tries
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 5101a065e..066dcb0ad 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -263,6 +263,7 @@ class ImpalaShell(cmd.Cmd, object):
     self.http_path = options.http_path
     self.fetch_size = options.fetch_size
     self.http_cookie_names = options.http_cookie_names
+    self.http_tracing = not options.no_http_tracing
 
     # Due to a readline bug in centos/rhel7, importing it causes control characters to be
     # printed. This breaks any scripting against the shell in non-interactive mode. Since
@@ -610,7 +611,8 @@ class ImpalaShell(cmd.Cmd, object):
                           self.client_connect_timeout_ms, self.verbose,
                           use_http_base_transport=False, http_path=self.http_path,
                           http_cookie_names=None, value_converter=value_converter,
-                          rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file)
+                          rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file,
+                          http_tracing=self.http_tracing)
       elif protocol == 'hs2-http':
         return StrictHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn,
                           self.use_kerberos, self.kerberos_service_name, self.use_ssl,
@@ -619,7 +621,7 @@ class ImpalaShell(cmd.Cmd, object):
                           use_http_base_transport=True, http_path=self.http_path,
                           http_cookie_names=self.http_cookie_names,
                           value_converter=value_converter, rpc_stdout=self.rpc_stdout,
-                          rpc_file=self.rpc_file)
+                          rpc_file=self.rpc_file, http_tracing=self.http_tracing)
     if protocol == 'hs2':
       return ImpalaHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn,
                           self.use_kerberos, self.kerberos_service_name, self.use_ssl,
@@ -627,7 +629,8 @@ class ImpalaShell(cmd.Cmd, object):
                           self.client_connect_timeout_ms, self.verbose,
                           use_http_base_transport=False, http_path=self.http_path,
                           http_cookie_names=None, value_converter=value_converter,
-                          rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file)
+                          rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file,
+                          http_tracing=self.http_tracing)
     elif protocol == 'hs2-http':
       return ImpalaHS2Client(self.impalad, self.fetch_size, self.kerberos_host_fqdn,
                           self.use_kerberos, self.kerberos_service_name, self.use_ssl,
@@ -638,7 +641,8 @@ class ImpalaShell(cmd.Cmd, object):
                           http_socket_timeout_s=self.http_socket_timeout_s,
                           value_converter=value_converter,
                           connect_max_tries=self.connect_max_tries,
-                          rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file)
+                          rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file,
+                          http_tracing=self.http_tracing)
     elif protocol == 'beeswax':
       return ImpalaBeeswaxClient(self.impalad, self.fetch_size, self.kerberos_host_fqdn,
                           self.use_kerberos, self.kerberos_service_name, self.use_ssl,
@@ -1346,7 +1350,8 @@ class ImpalaShell(cmd.Cmd, object):
         num_rows = 0
 
         for rows in rows_fetched:
-          # IMPALA-4418: Break out of the loop to prevent printing an unnecessary empty line.
+          # IMPALA-4418: Break out of the loop to prevent printing an unnecessary
+          # empty line.
           if len(rows) == 0:
             continue
           self.output_stream.write(rows)
diff --git a/shell/impala_shell_config_defaults.py b/shell/impala_shell_config_defaults.py
index 59f6cd043..b07cd86a6 100644
--- a/shell/impala_shell_config_defaults.py
+++ b/shell/impala_shell_config_defaults.py
@@ -60,5 +60,6 @@ impala_shell_defaults = {
             'http_socket_timeout_s': None,
             'global_config_default_path': '/etc/impalarc',
             'strict_hs2_protocol': False,
-            'hs2_fp_format': None
+            'hs2_fp_format': None,
+            'no_http_tracing': False
     }
diff --git a/shell/option_parser.py b/shell/option_parser.py
index 8d28a7404..b385be854 100755
--- a/shell/option_parser.py
+++ b/shell/option_parser.py
@@ -333,6 +333,11 @@ def get_option_parser(defaults):
                     "returned in an http response by the server or an intermediate proxy "
                     "then it will be included in each subsequent request for the same "
                     "connection.")
+  parser.add_option("--no_http_tracing", dest="no_http_tracing",
+                    action="store_true",
+                    help="Tracing http headers 'X-Request-Id', 'X-Impala-Session-Id', "
+                    "and 'X-Impala-Query-Id' will not be added to each http request "
+                    "(hs2-http protocol only).")
   parser.add_option("--hs2_fp_format", type="str",
                     dest="hs2_fp_format", default=None,
                     help="Sets the printing format specification for floating point "
diff --git a/tests/common/test_dimensions.py b/tests/common/test_dimensions.py
index e551d572b..7c4c3b2b5 100644
--- a/tests/common/test_dimensions.py
+++ b/tests/common/test_dimensions.py
@@ -137,6 +137,10 @@ def create_client_protocol_dimension():
   return ImpalaTestDimension('protocol', 'beeswax', 'hs2', 'hs2-http')
 
 
+def create_client_protocol_http_transport():
+  return ImpalaTestDimension('protocol', 'hs2-http')
+
+
 def create_client_protocol_strict_dimension():
   # only support strict dimensions if the file system is HDFS, since that is
   # where the hive cluster is run.
diff --git a/tests/custom_cluster/test_shell_commandline.py b/tests/custom_cluster/test_shell_commandline.py
new file mode 100644
index 000000000..c7379a93d
--- /dev/null
+++ b/tests/custom_cluster/test_shell_commandline.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import pytest
+import re
+import tempfile
+
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.test_dimensions import create_client_protocol_http_transport
+from time import sleep
+from tests.shell.util import run_impala_shell_cmd
+
+
+class TestImpalaShellCommandLine(CustomClusterTestSuite):
+  """Runs tests of the Impala shell by first standing up an Impala cluster with
+  specific startup flags.  Then, the Impala shell is launched with specific arguments
+  in a separate process.  Assertions are done by scanning the shell output and Impala
+  server logs for expected strings."""
+
+  LOG_DIR_HTTP_TRACING = tempfile.mkdtemp(prefix="http_tracing")
+  LOG_DIR_HTTP_TRACING_OFF = tempfile.mkdtemp(prefix="http_tracing_off")
+  IMPALA_ID_RE = "([0-9a-f]{16}:[0-9a-f]{16})"
+
+  @classmethod
+  def get_workload(self):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    cls.ImpalaTestMatrix.add_dimension(create_client_protocol_http_transport())
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("-log_dir={0} -v 2".format(LOG_DIR_HTTP_TRACING))
+  def test_http_tracing_headers(self, vector):
+    """Asserts that tracing headers are automatically added by the impala shell to
+    all calls to the backend impala engine made using the hs2 over http protocol.
+    The impala coordinator logs are searched to ensure these tracing headers were added
+    and also were passed through to the coordinator."""
+    args = ['--protocol', 'hs2-http', '-q', 'select version();profile']
+    result = run_impala_shell_cmd(vector, args)
+
+    # Shut down cluster to ensure logs flush to disk.
+    sleep(5)
+    self._stop_impala_cluster()
+
+    # Ensure the query ran successfully.
+    assert result.stdout.find("version()") > -1
+    assert result.stdout.find("impalad version") > -1
+    assert result.stdout.find("Query Runtime Profile") > -1
+
+    request_id_base = ""
+    request_id_serialnum = 0
+    session_id = ""
+    query_id = ""
+    last_known_query_id = ""
+    tracing_lines_count = 0
+
+    request_id_re = re.compile("x-request-id=([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
+                               "[0-9a-f]{4}-[0-9a-f]{12})-(\\d+)")
+    session_id_re = re.compile("x-session-id={0}"
+                               .format(TestImpalaShellCommandLine.IMPALA_ID_RE))
+    query_id_re = re.compile("x-query-id={0}"
+                               .format(TestImpalaShellCommandLine.IMPALA_ID_RE))
+    profile_query_id_re = re.compile("Query \\(id={0}\\)"
+                               .format(TestImpalaShellCommandLine.IMPALA_ID_RE))
+
+    # Find all HTTP Connection Tracing log lines.
+    with open(os.path.join(self.LOG_DIR_HTTP_TRACING, "impalad.INFO")) as log_file:
+      for line in log_file:
+        if line.find("HTTP Connection Tracing Headers") > -1:
+          tracing_lines_count += 1
+
+          # The impala shell builds a request_id that consists of the same randomly
+          # generated uuid and a serially increasing integer appended on the end.
+          # Ensure both these conditions are met.
+          m = request_id_re.search(line)
+          assert m is not None, \
+            "did not find request id in HTTP connection tracing log line '{0}'" \
+            .format(line)
+
+          if request_id_base == "":
+            # The current line is the very first HTTP connection tracing line in the logs.
+            request_id_base = m.group(1)
+          else:
+            assert request_id_base == m.group(1), \
+              "base request id expected '{0}', actual '{1}'" \
+              .format(request_id_base, m.group(1))
+
+          request_id_serialnum += 1
+          assert request_id_serialnum == int(m.group(2)), \
+            "request id serial number expected '{0}', actual '{1}'" \
+            .format(request_id_serialnum, m.group(2))
+
+          # The session_id is generated by impala and must be the same once it
+          # appears in a tracing log line.
+          m = session_id_re.search(line)
+          if m is not None:
+            if session_id == "":
+              session_id = m.group(1)
+            else:
+              assert session_id == m.group(1), \
+                "session id expected '{0}', actual '{1}'".format(session_id, m.group(1))
+
+          # The query_id is generated by impala and must be the same for the
+          # duration of the query.
+          m = query_id_re.search(line)
+          if m is None:
+            query_id = ""
+          else:
+            if query_id == "":
+              query_id = m.group(1)
+              last_known_query_id = query_id
+            else:
+              assert query_id == m.group(1), \
+                "query id expected '{0}', actual '{1}'".format(query_id, m.group(1))
+
+    # Assert that multiple HTTP connection tracing log lines were found.
+    assert tracing_lines_count > 10, \
+      "did not find enough HTTP connection tracing log lines, found {0} lines" \
+      .format(tracing_lines_count)
+
+    # Ensure the last found query id matches the actual query id
+    # from the impala query profile.
+    m = profile_query_id_re.search(result.stdout)
+    if m is not None:
+      assert last_known_query_id == m.group(1), \
+        "impala query profile id, expected '{0}', actual '{1}'" \
+        .format(last_known_query_id, m.group(1))
+    else:
+      pytest.fail("did not find Impala query id in shell stdout")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args("-log_dir={0} -v 2".format(LOG_DIR_HTTP_TRACING_OFF))
+  def test_http_tracing_headers_off(self, vector):
+    """Asserts the impala shell command line parameter to prevent the addition of http
+    tracing headers actually leaves out those tracing headers."""
+    args = ['--protocol', 'hs2-http', '--no_http_tracing',
+            '-q', 'select version();profile']
+    result = run_impala_shell_cmd(vector, args)
+
+    # Shut down cluster to ensure logs flush to disk.
+    sleep(5)
+    self._stop_impala_cluster()
+
+    # Ensure the query ran successfully.
+    assert result.stdout.find("version()") > -1
+    assert result.stdout.find("impalad version") > -1
+    assert result.stdout.find("Query Runtime Profile") > -1
+
+    # Find all HTTP Connection Tracing log lines (there should not be any).
+    with open(os.path.join(self.LOG_DIR_HTTP_TRACING_OFF, "impalad.INFO")) as log_file:
+      for line in log_file:
+        if line.find("HTTP Connection Tracing Headers") != -1:
+          pytest.fail("found HTTP connection tracing line line: {0}".format(line))