You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2020/10/24 10:27:54 UTC

[impala] branch master updated (cfa8a7a -> 3ba8d63)

This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git.


    from cfa8a7a  IMPALA-10278: Use full libraries for impalad_executor Docker container
     new 4b5c66f  IMPALA-10266: Identify FileSystem type based on the protocol scheme.
     new fd1ea14  IMPALA-10210: Skip Authentication for connection from a trusted domain
     new 2a1d3ac  IMPALA-9870: impala-shell 'summary' to show original and retried queries
     new 3ba8d63  IMPALA-10256: Skip test_disable_incremental_metadata_updates on S3 tests

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/rpc/CMakeLists.txt                          |   2 +-
 .../rpc/{cookie-util.cc => authentication-util.cc} |  47 ++++++-
 .../rpc/{cookie-util.h => authentication-util.h}   |  14 ++
 be/src/rpc/authentication.cc                       |  67 ++++++---
 be/src/service/impala-hs2-server.cc                |   9 +-
 be/src/service/impala-server.cc                    |  34 ++++-
 be/src/service/impala-server.h                     |   8 +-
 be/src/transport/THttpServer.cpp                   |  31 +++-
 be/src/transport/THttpServer.h                     |  25 +++-
 be/src/util/webserver.cc                           |  74 ++++++++--
 be/src/util/webserver.h                            |  13 ++
 common/thrift/ImpalaService.thrift                 |  11 +-
 common/thrift/metrics.json                         |  22 +++
 .../org/apache/impala/analysis/LoadDataStmt.java   |  20 +--
 .../impala/catalog/CatalogServiceCatalog.java      |   4 +-
 .../org/apache/impala/common/FileSystemUtil.java   | 156 ++++++++++++++++-----
 .../org/apache/impala/service/JniFrontend.java     |  11 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |   6 +-
 .../apache/impala/common/FileSystemUtilTest.java   | 137 +++++++++++++++++-
 .../apache/impala/customcluster/LdapHS2Test.java   | 104 +++++++++++++-
 .../impala/customcluster/LdapWebserverTest.java    |  68 ++++++++-
 shell/impala_client.py                             |  22 ++-
 shell/impala_shell.py                              |  70 ++++++---
 tests/custom_cluster/test_disable_features.py      |   2 +
 tests/custom_cluster/test_shell_interactive.py     |  23 ++-
 25 files changed, 833 insertions(+), 147 deletions(-)
 rename be/src/rpc/{cookie-util.cc => authentication-util.cc} (81%)
 rename be/src/rpc/{cookie-util.h => authentication-util.h} (65%)


[impala] 01/04: IMPALA-10266: Identify FileSystem type based on the protocol scheme.

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4b5c66f329cdd818dd11cd1a9c68b58c84bcf45c
Author: Riza Suminto <ri...@cloudera.com>
AuthorDate: Wed Oct 21 11:00:54 2020 -0700

    IMPALA-10266: Identify FileSystem type based on the protocol scheme.
    
    Frontend identifies the type of FileSystem in two ways. The first is
    done using the instanceof operator with subclasses of
    org.apache.hadoop.fs.FileSystem. The second is by checking the
    FileSystem protocol scheme. This patch standardizes the FileSystem
    identification based on the scheme only.
    
    Testing:
    - Add several tests in FileSystemUtilTest to check validity of some
      FileSystemUtil functions.
    - Run and pass core tests.
    
    Change-Id: I04492326a6e84895eef369fc11a3ec11f1536b6b
    Reviewed-on: http://gerrit.cloudera.org:8080/16628
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/LoadDataStmt.java   |  20 +--
 .../impala/catalog/CatalogServiceCatalog.java      |   4 +-
 .../org/apache/impala/common/FileSystemUtil.java   | 156 ++++++++++++++++-----
 .../org/apache/impala/service/JniFrontend.java     |  11 +-
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |   6 +-
 .../apache/impala/common/FileSystemUtilTest.java   | 137 +++++++++++++++++-
 6 files changed, 265 insertions(+), 69 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
index b2daea8..5aa7b42 100644
--- a/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/LoadDataStmt.java
@@ -23,13 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.adl.AdlFileSystem;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
-import org.apache.hadoop.fs.ozone.OzoneFileSystem;
 import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
@@ -147,14 +141,10 @@ public class LoadDataStmt extends StatementBase {
     try {
       Path source = sourceDataPath_.getPath();
       FileSystem fs = source.getFileSystem(FileSystemUtil.getConfiguration());
-      if (!(fs instanceof DistributedFileSystem) && !(fs instanceof S3AFileSystem) &&
-          !(fs instanceof AzureBlobFileSystem) &&
-          !(fs instanceof SecureAzureBlobFileSystem) &&
-          !(fs instanceof AdlFileSystem) &&
-          !(fs instanceof OzoneFileSystem)) {
+      if (!FileSystemUtil.isValidLoadDataInpath(fs)) {
         throw new AnalysisException(String.format("INPATH location '%s' "
-                + "must point to an HDFS, S3A, ADL, ABFS, or Ozone filesystem.",
-            sourceDataPath_));
+                + "must point to one of the supported filesystem URI scheme (%s).",
+            sourceDataPath_, FileSystemUtil.getValidLoadDataInpathSchemes()));
       }
       if (!fs.exists(source)) {
         throw new AnalysisException(String.format(
@@ -166,8 +156,8 @@ public class LoadDataStmt extends StatementBase {
       // its parent directory (in order to delete the file as part of the move operation).
       FsPermissionChecker checker = FsPermissionChecker.getInstance();
       // TODO: Disable permission checking for S3A as well (HADOOP-13892)
-      boolean shouldCheckPerms = !(fs instanceof AdlFileSystem ||
-        fs instanceof AzureBlobFileSystem || fs instanceof SecureAzureBlobFileSystem);
+      boolean shouldCheckPerms =
+          FileSystemUtil.FsType.getFsType(fs.getScheme()) != FileSystemUtil.FsType.ADLS;
 
       if (fs.isDirectory(source)) {
         if (FileSystemUtil.getTotalNumVisibleFiles(source) == 0) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 38aed8f..7fee648 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -321,7 +321,7 @@ public class CatalogServiceCatalog extends Catalog {
     try {
       // We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3,
       // local, etc.)
-      if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
+      if (FileSystemUtil.isDistributedFileSystem(FileSystemUtil.getDefaultFileSystem())) {
         cachePoolReader_.scheduleAtFixedRate(
             new CachePoolReader(false), 0, 1, TimeUnit.MINUTES);
       }
@@ -1732,7 +1732,7 @@ public class CatalogServiceCatalog extends Catalog {
     try {
       // We want only 'true' HDFS filesystems to poll the HDFS cache (i.e not S3,
       // local, etc.)
-      if (FileSystemUtil.getDefaultFileSystem() instanceof DistributedFileSystem) {
+      if (FileSystemUtil.isDistributedFileSystem(FileSystemUtil.getDefaultFileSystem())) {
         CachePoolReader reader = new CachePoolReader(true);
         reader.run();
       }
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index ea01c6a..89a26a5 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -20,19 +20,17 @@ package org.apache.impala.common;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.adl.AdlFileSystem;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
 import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
-import org.apache.hadoop.fs.ozone.OzoneFileSystem;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
@@ -47,6 +45,7 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.Stack;
 import java.util.UUID;
 
@@ -58,6 +57,65 @@ public class FileSystemUtil {
   private static final Configuration CONF = new Configuration();
   private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtil.class);
 
+  public static final String SCHEME_ABFS = "abfs";
+  public static final String SCHEME_ABFSS = "abfss";
+  public static final String SCHEME_ADL = "adl";
+  public static final String SCHEME_FILE = "file";
+  public static final String SCHEME_HDFS = "hdfs";
+  public static final String SCHEME_S3A = "s3a";
+  public static final String SCHEME_O3FS = "o3fs";
+  public static final String SCHEME_ALLUXIO = "alluxio";
+
+  /**
+   * Set containing all FileSystem scheme that known to supports storage UUIDs in
+   * BlockLocation calls.
+   */
+  private static final Set<String> SCHEME_SUPPORT_STORAGE_IDS =
+      ImmutableSet.<String>builder()
+          .add(SCHEME_HDFS)
+          .add(SCHEME_O3FS)
+          .add(SCHEME_ALLUXIO)
+          .build();
+
+  /**
+   * Set containing all FileSystem scheme that is writeable by Impala.
+   */
+  private static final Set<String> SCHEME_WRITEABLE_BY_IMPALA =
+      ImmutableSet.<String>builder()
+          .add(SCHEME_ABFS)
+          .add(SCHEME_ABFSS)
+          .add(SCHEME_ADL)
+          .add(SCHEME_FILE)
+          .add(SCHEME_HDFS)
+          .add(SCHEME_S3A)
+          .add(SCHEME_O3FS)
+          .build();
+
+  /**
+   * Set containing all FileSystem scheme that is supported as Impala default FileSystem.
+   */
+  private static final Set<String> SCHEME_SUPPORTED_AS_DEFAULT_FS =
+      ImmutableSet.<String>builder()
+          .add(SCHEME_ABFS)
+          .add(SCHEME_ABFSS)
+          .add(SCHEME_ADL)
+          .add(SCHEME_HDFS)
+          .add(SCHEME_S3A)
+          .build();
+
+  /**
+   * Set containing all FileSystem scheme that is valid as INPATH for LOAD DATA statement.
+   */
+  private static final Set<String> SCHEME_VALID_FOR_LOAD_INPATH =
+      ImmutableSet.<String>builder()
+          .add(SCHEME_ABFS)
+          .add(SCHEME_ABFSS)
+          .add(SCHEME_ADL)
+          .add(SCHEME_HDFS)
+          .add(SCHEME_S3A)
+          .add(SCHEME_O3FS)
+          .build();
+
   /**
    * Performs a non-recursive delete of all visible (non-hidden) files in a given
    * directory. Returns the number of files deleted as part of this operation.
@@ -309,40 +367,35 @@ public class FileSystemUtil {
    * Returns true if the filesystem supports storage UUIDs in BlockLocation calls.
    */
   public static boolean supportsStorageIds(FileSystem fs) {
-    // Common case.
-    if (isDistributedFileSystem(fs)) return true;
-    // Blacklist FileSystems that are known to not to include storage UUIDs.
-    return !(fs instanceof S3AFileSystem || fs instanceof LocalFileSystem ||
-        fs instanceof AzureBlobFileSystem || fs instanceof SecureAzureBlobFileSystem ||
-        fs instanceof AdlFileSystem);
+    return SCHEME_SUPPORT_STORAGE_IDS.contains(fs.getScheme());
   }
 
   /**
    * Returns true iff the filesystem is a S3AFileSystem.
    */
   public static boolean isS3AFileSystem(FileSystem fs) {
-    return fs instanceof S3AFileSystem;
+    return hasScheme(fs, SCHEME_S3A);
   }
 
   /**
    * Returns true iff the path is on a S3AFileSystem.
    */
   public static boolean isS3AFileSystem(Path path) throws IOException {
-    return isS3AFileSystem(path.getFileSystem(CONF));
+    return hasScheme(path, SCHEME_S3A);
   }
 
   /**
    * Returns true iff the filesystem is AdlFileSystem.
    */
   public static boolean isADLFileSystem(FileSystem fs) {
-    return fs instanceof AdlFileSystem;
+    return hasScheme(fs, SCHEME_ADL);
   }
 
   /**
    * Returns true iff the path is on AdlFileSystem.
    */
   public static boolean isADLFileSystem(Path path) throws IOException {
-    return isADLFileSystem(path.getFileSystem(CONF));
+    return hasScheme(path, SCHEME_ADL);
   }
 
   /**
@@ -353,8 +406,7 @@ public class FileSystemUtil {
    * wire encryption but that does not matter in usages of this function.
    */
   public static boolean isABFSFileSystem(FileSystem fs) {
-    return fs instanceof AzureBlobFileSystem
-        || fs instanceof SecureAzureBlobFileSystem;
+    return hasScheme(fs, SCHEME_ABFS) || hasScheme(fs, SCHEME_ABFSS);
   }
 
   /**
@@ -362,49 +414,63 @@ public class FileSystemUtil {
    * SecureAzureBlobFileSystem.
    */
   public static boolean isABFSFileSystem(Path path) throws IOException {
-    return isABFSFileSystem(path.getFileSystem(CONF));
+    return hasScheme(path, SCHEME_ABFS) || hasScheme(path, SCHEME_ABFSS);
   }
 
   /**
    * Returns true iff the filesystem is an instance of LocalFileSystem.
    */
   public static boolean isLocalFileSystem(FileSystem fs) {
-    return fs instanceof LocalFileSystem;
+    return hasScheme(fs, SCHEME_FILE);
   }
 
   /**
    * Return true iff path is on a local filesystem.
    */
   public static boolean isLocalFileSystem(Path path) throws IOException {
-    return isLocalFileSystem(path.getFileSystem(CONF));
+    return hasScheme(path, SCHEME_FILE);
   }
 
   /**
    * Returns true iff the filesystem is a DistributedFileSystem.
    */
   public static boolean isDistributedFileSystem(FileSystem fs) {
-    return fs instanceof DistributedFileSystem;
+    return hasScheme(fs, SCHEME_HDFS);
   }
 
   /**
    * Return true iff path is on a DFS filesystem.
    */
   public static boolean isDistributedFileSystem(Path path) throws IOException {
-    return isDistributedFileSystem(path.getFileSystem(CONF));
+    return hasScheme(path, SCHEME_HDFS);
   }
 
   /**
    * Returns true iff the filesystem is a OzoneFileSystem.
    */
   public static boolean isOzoneFileSystem(FileSystem fs) {
-    return fs instanceof OzoneFileSystem;
+    return hasScheme(fs, SCHEME_O3FS);
   }
 
   /**
    * Returns true iff the path is on OzoneFileSystem.
    */
   public static boolean isOzoneFileSystem(Path path) throws IOException {
-    return isOzoneFileSystem(path.getFileSystem(CONF));
+    return hasScheme(path, SCHEME_O3FS);
+  }
+
+  /**
+   * Returns true if the filesystem protocol match the scheme.
+   */
+  private static boolean hasScheme(FileSystem fs, String scheme) {
+    return scheme.equals(fs.getScheme());
+  }
+
+  /**
+   * Returns true if the given path match the scheme.
+   */
+  private static boolean hasScheme(Path path, String scheme) {
+    return scheme.equals(path.toUri().getScheme());
   }
 
   /**
@@ -429,14 +495,14 @@ public class FileSystemUtil {
 
     private static final Map<String, FsType> SCHEME_TO_FS_MAPPING =
         ImmutableMap.<String, FsType>builder()
-            .put("abfs", ADLS)
-            .put("abfss", ADLS)
-            .put("adl", ADLS)
-            .put("file", LOCAL)
-            .put("hdfs", HDFS)
-            .put("s3a", S3)
-            .put("o3fs", OZONE)
-            .put("alluxio", ALLUXIO)
+            .put(SCHEME_ABFS, ADLS)
+            .put(SCHEME_ABFSS, ADLS)
+            .put(SCHEME_ADL, ADLS)
+            .put(SCHEME_FILE, LOCAL)
+            .put(SCHEME_HDFS, HDFS)
+            .put(SCHEME_S3A, S3)
+            .put(SCHEME_O3FS, OZONE)
+            .put(SCHEME_ALLUXIO, ALLUXIO)
             .build();
 
     /**
@@ -548,12 +614,30 @@ public class FileSystemUtil {
   public static boolean isImpalaWritableFilesystem(String location)
       throws IOException {
     Path path = new Path(location);
-    return (FileSystemUtil.isDistributedFileSystem(path) ||
-        FileSystemUtil.isLocalFileSystem(path) ||
-        FileSystemUtil.isS3AFileSystem(path) ||
-        FileSystemUtil.isABFSFileSystem(path) ||
-        FileSystemUtil.isADLFileSystem(path) ||
-        FileSystemUtil.isOzoneFileSystem(path));
+    String scheme = path.toUri().getScheme();
+    return SCHEME_WRITEABLE_BY_IMPALA.contains(scheme);
+  }
+
+  /**
+   * Returns true iff the given filesystem is supported as Impala default filesystem.
+   */
+  public static boolean isValidDefaultFileSystem(FileSystem fs) {
+    return SCHEME_SUPPORTED_AS_DEFAULT_FS.contains(fs.getScheme());
+  }
+
+  /**
+   * Returns true iff the given filesystem is valid as INPATH for LOAD DATA statement.
+   */
+  public static boolean isValidLoadDataInpath(FileSystem fs) {
+    return SCHEME_VALID_FOR_LOAD_INPATH.contains(fs.getScheme());
+  }
+
+  /**
+   * Return list of FileSystem protocol scheme that is valid as INPATH for LOAD DATA
+   * statement, delimited by comma.
+   */
+  public static String getValidLoadDataInpathSchemes() {
+    return String.join(", ", SCHEME_VALID_FOR_LOAD_INPATH);
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index 324105d..0eacf29 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -26,12 +26,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.adl.AdlFileSystem;
-import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.SecureAzureBlobFileSystem;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.JniBasedUnixGroupsMappingWithFallback;
 import org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMappingWithFallback;
@@ -811,11 +806,7 @@ public class JniFrontend {
   private String checkFileSystem(Configuration conf) {
     try {
       FileSystem fs = FileSystem.get(CONF);
-      if (!(fs instanceof DistributedFileSystem ||
-            fs instanceof S3AFileSystem ||
-            fs instanceof AzureBlobFileSystem ||
-            fs instanceof SecureAzureBlobFileSystem ||
-            fs instanceof AdlFileSystem)) {
+      if (!FileSystemUtil.isValidDefaultFileSystem(fs)) {
         return "Currently configured default filesystem: " +
             fs.getClass().getSimpleName() + ". " +
             CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY +
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index bbce69b..f523c90 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -35,6 +35,7 @@ import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.thrift.TFunctionCategory;
 import org.junit.Assert;
@@ -3875,8 +3876,9 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       // Source must be HDFS or S3A.
       AnalysisError(String.format("load data inpath '%s' %s into table " +
           "tpch.lineitem", "file:///test-warehouse/test.out", overwrite),
-          "INPATH location 'file:/test-warehouse/test.out' must point to an " +
-          "HDFS, S3A, ADL, ABFS, or Ozone filesystem.");
+          "INPATH location 'file:/test-warehouse/test.out' must point to one of the " +
+          "supported filesystem URI scheme (" +
+          FileSystemUtil.getValidLoadDataInpathSchemes() + ").");
 
       // File type / table type mismatch.
       AnalyzesOk(String.format("load data inpath '%s' %s into table " +
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index da5e11e..2ed60f6 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -24,10 +24,13 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
+
 /**
  * Tests for the various util methods in FileSystemUtil class
  */
@@ -86,10 +89,80 @@ public class FileSystemUtilTest {
   }
 
   @Test
-  public void testAlluxioFsType() {
-    Path path = new Path("alluxio://zk@zk-1:2181,zk-2:2181,zk-3:2181/path/");
-    assertEquals(FileSystemUtil.FsType.ALLUXIO,
-        FileSystemUtil.FsType.getFsType(path.toUri().getScheme()));
+  public void testFsType() throws IOException {
+    testFsType(mockLocation(FileSystemUtil.SCHEME_ABFS), FileSystemUtil.FsType.ADLS);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_ABFSS), FileSystemUtil.FsType.ADLS);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_ADL), FileSystemUtil.FsType.ADLS);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_FILE), FileSystemUtil.FsType.LOCAL);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_HDFS), FileSystemUtil.FsType.HDFS);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_S3A), FileSystemUtil.FsType.S3);
+    testFsType(mockLocation(FileSystemUtil.SCHEME_O3FS), FileSystemUtil.FsType.OZONE);
+    testFsType(
+        mockLocation(FileSystemUtil.SCHEME_ALLUXIO), FileSystemUtil.FsType.ALLUXIO);
+  }
+
+  @Test
+  public void testSupportStorageIds() throws IOException {
+    testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFS), false);
+    testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ABFSS), false);
+    testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ADL), false);
+    testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_FILE), false);
+    testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_S3A), false);
+
+    testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_HDFS), true);
+
+    // The following tests are disabled because the underlying systems is not included
+    // in impala mini cluster.
+    // TODO: enable following tests if we add them into impala mini cluster.
+    // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_O3FS), true);
+    // testIsSupportStorageIds(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), true);
+  }
+
+  @Test
+  public void testWriteableByImpala() throws IOException {
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
+
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_ABFS), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_ABFSS), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_ADL), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_FILE), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_HDFS), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_S3A), true);
+    testIsWritableByImpala(mockLocation(FileSystemUtil.SCHEME_O3FS), true);
+  }
+
+  @Test
+  public void testSupportedDefaultFs() throws IOException {
+    testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_ABFS), true);
+    testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_ABFSS), true);
+    testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_ADL), true);
+    testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_HDFS), true);
+    testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_S3A), true);
+
+    testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_FILE), false);
+
+    // The following tests are disabled because the underlying systems is not included
+    // in impala mini cluster.
+    // TODO: enable following tests if we add them into impala mini cluster.
+    // testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_O3FS), false);
+    // testIsSupportedDefaultFs(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
+  }
+
+  @Test
+  public void testValidLoadDataInpath() throws IOException {
+    testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_ABFS), true);
+    testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_ABFSS), true);
+    testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_ADL), true);
+    testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_HDFS), true);
+    testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_S3A), true);
+
+    testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_FILE), false);
+
+    // The following tests are disabled because the underlying systems is not included
+    // in impala mini cluster.
+    // TODO: enable following tests if we add them into impala mini cluster.
+    // testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_O3FS), true);
+    // testValidLoadDataInpath(mockLocation(FileSystemUtil.SCHEME_ALLUXIO), false);
   }
 
   private boolean testIsInIgnoredDirectory(Path input) {
@@ -102,4 +175,60 @@ public class FileSystemUtilTest {
     Mockito.when(mockFileStatus.isDirectory()).thenReturn(isDir);
     return FileSystemUtil.isInIgnoredDirectory(TEST_TABLE_PATH, mockFileStatus);
   }
+
+  private String mockLocation(String scheme) throws IOException {
+    switch (scheme) {
+      case FileSystemUtil.SCHEME_ABFS:
+        return "abfs://dummy-fs@dummy-account.dfs.core.windows.net/dummy-part-1";
+      case FileSystemUtil.SCHEME_ABFSS:
+        return "abfss://dummy-fs@dummy-account.dfs.core.windows.net/dummy-part-2";
+      case FileSystemUtil.SCHEME_ADL:
+        return "adl://dummy-account.azuredatalakestore.net/dummy-part-3";
+      case FileSystemUtil.SCHEME_FILE:
+        return "file://tmp/dummy-part-4";
+      case FileSystemUtil.SCHEME_HDFS:
+        return "hdfs://localhost:20500/dummy-part-5";
+      case FileSystemUtil.SCHEME_S3A:
+        return "s3a://dummy-bucket/dummy-part-6";
+      case FileSystemUtil.SCHEME_O3FS:
+        return "o3fs://bucket.volume/key";
+      case FileSystemUtil.SCHEME_ALLUXIO:
+        return "alluxio://zk@zk-1:2181,zk-2:2181,zk-3:2181/path/";
+      default:
+        throw new IOException("FileSystem scheme is not supported!");
+    }
+  }
+
+  private void testFsType(String location, FileSystemUtil.FsType expected) {
+    Path path = new Path(location);
+    FileSystemUtil.FsType type =
+        FileSystemUtil.FsType.getFsType(path.toUri().getScheme());
+    assertEquals(type, expected);
+  }
+
+  private void testIsSupportStorageIds(String location, boolean expected)
+      throws IOException {
+    Path path = new Path(location);
+    FileSystem fs = FileSystemUtil.getFileSystemForPath(path);
+    assertEquals(FileSystemUtil.supportsStorageIds(fs), expected);
+  }
+
+  private void testIsWritableByImpala(String location, boolean expected)
+      throws IOException {
+    assertEquals(FileSystemUtil.isImpalaWritableFilesystem(location), expected);
+  }
+
+  private void testIsSupportedDefaultFs(String location, boolean expected)
+      throws IOException {
+    Path path = new Path(location);
+    FileSystem fs = FileSystemUtil.getFileSystemForPath(path);
+    assertEquals(FileSystemUtil.isValidDefaultFileSystem(fs), expected);
+  }
+
+  private void testValidLoadDataInpath(String location, boolean expected)
+      throws IOException {
+    Path path = new Path(location);
+    FileSystem fs = FileSystemUtil.getFileSystemForPath(path);
+    assertEquals(FileSystemUtil.isValidLoadDataInpath(fs), expected);
+  }
 }


[impala] 02/04: IMPALA-10210: Skip Authentication for connection from a trusted domain

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fd1ea147b3b49764bb2f2576b10d60d2bd65b916
Author: Bikramjeet Vig <bi...@gmail.com>
AuthorDate: Fri Oct 2 16:25:14 2020 -0700

    IMPALA-10210: Skip Authentication for connection from a trusted domain
    
    Adds the ability to skip authentication for connection requests
    originating from a trusted domain over the hs2 http endpoint and
    the http webserver endpoint. The trusted domain can be specified
    using the newly added "--trusted_domain" startup flag. Additionally,
    if the startup flag "--trusted_domain_use_xff_header" is set to true,
    impala will switch to using the 'X-Forwarded-For' HTML header to
    extract the origin address while attempting to check if the connection
    originated from a trusted domain.
    
    Other highlights:
    - This still requires the client to specify a username via a basic
      auth header.
    - To avoid looking up hostname for every http request, a cookie is
      returned on the first auth attempt which will then be subsequently
      used for further communication on the same connection.
    
    Testing:
    Added tests for both the hs2 http endpoint and the webserver http endpoint
    
    Change-Id: I09234078e2314dbc3177d0e869ae028e216ca699
    Reviewed-on: http://gerrit.cloudera.org:8080/16542
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/rpc/CMakeLists.txt                          |   2 +-
 .../rpc/{cookie-util.cc => authentication-util.cc} |  47 +++++++++-
 .../rpc/{cookie-util.h => authentication-util.h}   |  14 +++
 be/src/rpc/authentication.cc                       |  67 +++++++++----
 be/src/transport/THttpServer.cpp                   |  31 +++++-
 be/src/transport/THttpServer.h                     |  25 ++++-
 be/src/util/webserver.cc                           |  74 ++++++++++++---
 be/src/util/webserver.h                            |  13 +++
 common/thrift/metrics.json                         |  22 +++++
 .../apache/impala/customcluster/LdapHS2Test.java   | 104 ++++++++++++++++++++-
 .../impala/customcluster/LdapWebserverTest.java    |  68 +++++++++++++-
 11 files changed, 422 insertions(+), 45 deletions(-)

diff --git a/be/src/rpc/CMakeLists.txt b/be/src/rpc/CMakeLists.txt
index 10ba6ac..73b0e0c 100644
--- a/be/src/rpc/CMakeLists.txt
+++ b/be/src/rpc/CMakeLists.txt
@@ -29,7 +29,7 @@ set_source_files_properties(${PLANNER_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 add_library(Rpc
   authentication.cc
   ${COMMON_PROTO_SRCS}
-  cookie-util.cc
+  authentication-util.cc
   impala-service-pool.cc
   ${PLANNER_PROTO_SRCS}
   rpc-mgr.cc
diff --git a/be/src/rpc/cookie-util.cc b/be/src/rpc/authentication-util.cc
similarity index 81%
rename from be/src/rpc/cookie-util.cc
rename to be/src/rpc/authentication-util.cc
index e488f30..e910557 100644
--- a/be/src/rpc/cookie-util.cc
+++ b/be/src/rpc/authentication-util.cc
@@ -15,13 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "rpc/cookie-util.h"
+#include "rpc/authentication-util.h"
 
 #include <gutil/strings/escaping.h>
+#include <gutil/strings/util.h>
 #include <gutil/strings/split.h>
 #include <gutil/strings/strcat.h>
 #include <gutil/strings/strip.h>
 
+#include "kudu/util/net/sockaddr.h"
 #include "util/network-util.h"
 #include "util/openssl-util.h"
 #include "util/string-parser.h"
@@ -165,4 +167,47 @@ string GetDeleteCookie() {
   return Substitute("$0=;HttpOnly;Max-Age=0", COOKIE_NAME);
 }
 
+bool IsTrustedDomain(const std::string& origin, const std::string& trusted_domain) {
+  if (trusted_domain.empty()) return false;
+  vector<string> split = Split(origin, delimiter::Limit(",", 1));
+  if (split.empty()) return false;
+  kudu::Sockaddr sock_addr;
+  kudu::Status s = sock_addr.ParseString(split[0], 0);
+  string host_name;
+  if (!s.ok()) {
+    VLOG(2) << "Origin address did not parse as a valid IP address. Assuming it to be a "
+               "domain name. Reason: " << s.ToString();
+    // Remove port if its a part of the origin.
+    vector<string> host_n_port = Split(split[0], delimiter::Limit(":", 1));
+    host_name = host_n_port[0];
+  } else {
+    s = sock_addr.LookupHostname(&host_name);
+    if (!s.ok()) {
+      LOG(ERROR) << "DNS reverse-lookup failed for " << split[0]
+                 << " Error: " << s.ToString();
+      return false;
+    }
+  }
+  return HasSuffixString(host_name, trusted_domain);
+}
+
+Status BasicAuthExtractCredentials(
+    const string& token, string& username, string& password) {
+  if (token.empty()) {
+    return Status::Expected("Empty token");
+  }
+  string decoded;
+  if (!Base64Unescape(token, &decoded)) {
+    return Status::Expected("Failed to decode base64 basic authentication token.");
+  }
+  std::size_t colon = decoded.find(':');
+  if (colon == std::string::npos) {
+    return Status::Expected("Invalid basic authentication token format, must be in the "
+                            "form '<username>:<password>'");
+  }
+  username = decoded.substr(0, colon);
+  password = decoded.substr(colon + 1);
+  return Status::OK();
+}
+
 } // namespace impala
diff --git a/be/src/rpc/cookie-util.h b/be/src/rpc/authentication-util.h
similarity index 65%
rename from be/src/rpc/cookie-util.h
rename to be/src/rpc/authentication-util.h
index 5482087..c07b7cc 100644
--- a/be/src/rpc/cookie-util.h
+++ b/be/src/rpc/authentication-util.h
@@ -37,4 +37,18 @@ std::string GenerateCookie(const std::string& username, const AuthenticationHash
 // to the client that the cookie should be deleted.
 std::string GetDeleteCookie();
 
+// Takes a comma separated list of ip address/hostname with or without a port, picks the
+// left most on the list (this assumes the list follows the http standard for
+// 'X-Forwarded-For' header where the left-most IP address is the IP address of the
+// originating client), does a reverse DNS lookup on it if its a valid ip address and
+// finally checks if it originates from the 'trusted_domain'. Returns true if the origin
+// is from the trusted domain.
+bool IsTrustedDomain(const std::string& origin, const std::string& trusted_domain);
+
+// Takes in the base64 encoded token and returns the username and password via the input
+// arguments. Returns an OK status if the token is a valid base64 encoded string of the
+// form <username>:<password>, an error status otherwise.
+Status BasicAuthExtractCredentials(
+    const string& token, string& username, string& password);
+
 } // namespace impala
diff --git a/be/src/rpc/authentication.cc b/be/src/rpc/authentication.cc
index e188f86..1b3f82c 100644
--- a/be/src/rpc/authentication.cc
+++ b/be/src/rpc/authentication.cc
@@ -44,7 +44,7 @@
 #include "kudu/security/gssapi.h"
 #include "kudu/security/init.h"
 #include "rpc/auth-provider.h"
-#include "rpc/cookie-util.h"
+#include "rpc/authentication-util.h"
 #include "rpc/thrift-server.h"
 #include "runtime/exec-env.h"
 #include "transport/THttpServer.h"
@@ -105,6 +105,18 @@ DEFINE_string(internal_principals_whitelist, "hdfs", "(Advanced) Comma-separated
     "'hdfs' which is the system user that in certain deployments must access "
     "catalog server APIs.");
 
+DEFINE_string(trusted_domain, "",
+    "If set, Impala will skip authentication for connections originating from this "
+    "domain. Currently, only connections over HTTP support this. Note: It still requires "
+    "the client to specify a username via the Basic Authorization header in the format "
+    "<username>:<password> where the password is not used and can be left blank.");
+
+DEFINE_bool(trusted_domain_use_xff_header, false,
+    "If set to true, this uses the 'X-Forwarded-For' HTML header to check for origin "
+    "while attempting to verify if the connection request originated from a trusted "
+    "domain. Only used if '--trusted_domain' is specified. Warning: Only use this if you "
+    "trust the incoming connection to have this set correctly.");
+
 namespace impala {
 
 // Sasl callbacks.  Why are these here?  Well, Sasl isn't that bright, and
@@ -446,28 +458,40 @@ bool CookieAuth(ThriftServer::ConnectionContext* connection_context,
   return false;
 }
 
-bool BasicAuth(ThriftServer::ConnectionContext* connection_context,
-    const AuthenticationHash& hash, const std::string& base64) {
-  if (base64.empty()) {
-    connection_context->return_headers.push_back("WWW-Authenticate: Basic");
-    return false;
-  }
-  string decoded;
-  if (!Base64Unescape(base64, &decoded)) {
-    LOG(ERROR) << "Failed to decode base64 auth string from: "
-               << TNetworkAddressToString(connection_context->network_address);
-    connection_context->return_headers.push_back("WWW-Authenticate: Basic");
+bool TrustedDomainCheck(ThriftServer::ConnectionContext* connection_context,
+    const AuthenticationHash& hash, const std::string& origin, string auth_header) {
+  if (!IsTrustedDomain(origin, FLAGS_trusted_domain)) return false;
+  string stripped_basic_auth_token;
+  StripWhiteSpace(&auth_header);
+  bool got_basic_auth =
+      TryStripPrefixString(auth_header, "Basic ", &stripped_basic_auth_token);
+  string basic_auth_token = got_basic_auth ? move(stripped_basic_auth_token) : "";
+  string username, password;
+  Status status = BasicAuthExtractCredentials(basic_auth_token, username, password);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error parsing basic authentication token from: "
+               << TNetworkAddressToString(connection_context->network_address)
+               << " Error: " << status;
     return false;
   }
-  std::size_t colon = decoded.find(':');
-  if (colon == std::string::npos) {
-    LOG(ERROR) << "Auth string must be in the form '<username>:<password>' from: "
-               << TNetworkAddressToString(connection_context->network_address);
+  connection_context->username = username;
+  // Create a cookie to return.
+  connection_context->return_headers.push_back(
+      Substitute("Set-Cookie: $0", GenerateCookie(username, hash)));
+  return true;
+}
+
+bool BasicAuth(ThriftServer::ConnectionContext* connection_context,
+    const AuthenticationHash& hash, const string& base64) {
+  string username, password;
+  Status status = BasicAuthExtractCredentials(base64, username, password);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error parsing basic authentication token from: "
+               << TNetworkAddressToString(connection_context->network_address)
+               << " Error: " << status;
     connection_context->return_headers.push_back("WWW-Authenticate: Basic");
     return false;
   }
-  string username = decoded.substr(0, colon);
-  string password = decoded.substr(colon + 1);
   bool ret = DoLdapCheck(username.c_str(), password.c_str(), password.length());
   if (ret) {
     // Authenication was successful, so set the username on the connection.
@@ -905,8 +929,9 @@ Status SecureAuthProvider::GetServerTransportFactory(
   if (underlying_transport_type == ThriftServer::HTTP) {
     bool has_kerberos = !principal_.empty();
     bool use_cookies = FLAGS_max_cookie_lifetime_s > 0;
-    factory->reset(new THttpServerTransportFactory(
-        server_name, metrics, has_ldap_, has_kerberos, use_cookies));
+    bool check_trusted_domain = !FLAGS_trusted_domain.empty();
+    factory->reset(new THttpServerTransportFactory(server_name, metrics, has_ldap_,
+        has_kerberos, use_cookies, check_trusted_domain));
     return Status::OK();
   }
 
@@ -1000,6 +1025,8 @@ void SecureAuthProvider::SetupConnectionContext(
       callbacks.return_headers_fn = std::bind(ReturnHeaders, connection_ptr.get());
       callbacks.cookie_auth_fn =
           std::bind(CookieAuth, connection_ptr.get(), hash_, std::placeholders::_1);
+      callbacks.trusted_domain_check_fn = std::bind(TrustedDomainCheck,
+          connection_ptr.get(), hash_, std::placeholders::_1, std::placeholders::_2);
       if (has_ldap_) {
         callbacks.basic_auth_fn =
             std::bind(BasicAuth, connection_ptr.get(), hash_, std::placeholders::_1);
diff --git a/be/src/transport/THttpServer.cpp b/be/src/transport/THttpServer.cpp
index a3b6712..0aed3c3 100644
--- a/be/src/transport/THttpServer.cpp
+++ b/be/src/transport/THttpServer.cpp
@@ -33,6 +33,8 @@
 
 #include "util/metrics.h"
 
+DECLARE_bool(trusted_domain_use_xff_header);
+
 namespace apache {
 namespace thrift {
 namespace transport {
@@ -41,10 +43,12 @@ using namespace std;
 using strings::Substitute;
 
 THttpServerTransportFactory::THttpServerTransportFactory(const std::string server_name,
-    impala::MetricGroup* metrics, bool has_ldap, bool has_kerberos, bool use_cookies)
+    impala::MetricGroup* metrics, bool has_ldap, bool has_kerberos, bool use_cookies,
+    bool check_trusted_domain)
   : has_ldap_(has_ldap),
     has_kerberos_(has_kerberos),
     use_cookies_(use_cookies),
+    check_trusted_domain_(check_trusted_domain),
     metrics_enabled_(metrics != nullptr) {
   if (metrics_enabled_) {
     if (has_ldap_) {
@@ -65,15 +69,21 @@ THttpServerTransportFactory::THttpServerTransportFactory(const std::string serve
       http_metrics_.total_cookie_auth_failure_ =
           metrics->AddCounter(Substitute("$0.total-cookie-auth-failure", server_name), 0);
     }
+    if (check_trusted_domain_) {
+      http_metrics_.total_trusted_domain_check_success_ = metrics->AddCounter(
+          Substitute("$0.total-trusted-domain-check-success", server_name), 0);
+    }
   }
 }
 
 THttpServer::THttpServer(boost::shared_ptr<TTransport> transport, bool has_ldap,
-    bool has_kerberos, bool use_cookies, bool metrics_enabled, HttpMetrics* http_metrics)
+    bool has_kerberos, bool use_cookies, bool check_trusted_domain, bool metrics_enabled,
+    HttpMetrics* http_metrics)
   : THttpTransport(transport),
     has_ldap_(has_ldap),
     has_kerberos_(has_kerberos),
     use_cookies_(use_cookies),
+    check_trusted_domain_(check_trusted_domain),
     metrics_enabled_(metrics_enabled),
     http_metrics_(http_metrics) {}
 
@@ -192,6 +202,23 @@ void THttpServer::headersDone() {
     }
   }
 
+  // Bypass auth for connections from trusted domains. Returns a cookie on the first
+  // successful auth attempt. This check is performed after checking for cookie to avoid
+  // subsequent reverse DNS lookups which can be unpredictably costly.
+  if (!authorized && check_trusted_domain_) {
+    string origin =
+        FLAGS_trusted_domain_use_xff_header ? origin_ : transport_->getOrigin();
+    StripWhiteSpace(&origin);
+    if (!origin.empty()) {
+      if (callbacks_.trusted_domain_check_fn(origin, auth_value_)) {
+        authorized = true;
+        if (metrics_enabled_) {
+          http_metrics_->total_trusted_domain_check_success_->Increment(1);
+        }
+      }
+    }
+  }
+
   // If cookie auth wasn't successful, try to auth with the 'Authorization' header.
   if (!authorized) {
     // Determine what type of auth header we got.
diff --git a/be/src/transport/THttpServer.h b/be/src/transport/THttpServer.h
index 11dc8ef..e916f43 100644
--- a/be/src/transport/THttpServer.h
+++ b/be/src/transport/THttpServer.h
@@ -42,6 +42,10 @@ struct HttpMetrics {
   // auth attempts.
   impala::IntCounter* total_cookie_auth_success_ = nullptr;
   impala::IntCounter* total_cookie_auth_failure_ = nullptr;
+
+  // If 'check_trusted_domain_' is true, metrics for the number of successful
+  // attempts to authorize connections originating from a trusted domain.
+  impala::IntCounter* total_trusted_domain_check_success_ = nullptr;
 };
 
 /*
@@ -77,10 +81,18 @@ public:
     // authentication is successful.
     std::function<bool(const std::string&)> cookie_auth_fn =
         [&](const std::string&) { return false; };
+
+    // Function that takes the connection's origin ip/hostname, and 'Authorization: Basic'
+    // header respectively and returns true if it determines that the connection
+    // originated from a trusted domain and if the basic auth header contains a valid
+    // username.
+    std::function<bool(const std::string&, std::string)> trusted_domain_check_fn =
+        [&](const std::string&, std::string) { return false; };
   };
 
   THttpServer(boost::shared_ptr<TTransport> transport, bool has_ldap, bool has_kerberos,
-      bool use_cookies, bool metrics_enabled, HttpMetrics* http_metrics);
+      bool use_cookies, bool check_trusted_domain, bool metrics_enabled,
+      HttpMetrics* http_metrics);
 
   virtual ~THttpServer();
 
@@ -117,6 +129,10 @@ protected:
   // The value from the 'Cookie' header.
   std::string cookie_value_ = "";
 
+  // If true, checks whether an incoming connection can skip auth if it originates from a
+  // trusted domain.
+  bool check_trusted_domain_ = false;
+
   bool metrics_enabled_ = false;
   HttpMetrics* http_metrics_ = nullptr;
 };
@@ -129,19 +145,20 @@ public:
  THttpServerTransportFactory() {}
 
  THttpServerTransportFactory(const std::string server_name, impala::MetricGroup* metrics,
-     bool has_ldap, bool has_kerberos, bool use_cookies);
+     bool has_ldap, bool has_kerberos, bool use_cookies, bool check_trusted_domain);
 
  virtual ~THttpServerTransportFactory() {}
 
  virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
-   return boost::shared_ptr<TTransport>(new THttpServer(
-       trans, has_ldap_, has_kerberos_, use_cookies_, metrics_enabled_, &http_metrics_));
+   return boost::shared_ptr<TTransport>(new THttpServer(trans, has_ldap_, has_kerberos_,
+       use_cookies_, check_trusted_domain_, metrics_enabled_, &http_metrics_));
   }
 
  private:
   bool has_ldap_ = false;
   bool has_kerberos_ = false;
   bool use_cookies_ = false;
+  bool check_trusted_domain_ = false;
 
   // Metrics for every transport produced by this factory.
   bool metrics_enabled_ = false;
diff --git a/be/src/util/webserver.cc b/be/src/util/webserver.cc
index a844047..75a1331 100644
--- a/be/src/util/webserver.cc
+++ b/be/src/util/webserver.cc
@@ -42,7 +42,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/net/sockaddr.h"
 #include "rpc/authentication.h"
-#include "rpc/cookie-util.h"
+#include "rpc/authentication-util.h"
 #include "rpc/thrift-util.h"
 #include "runtime/exec-env.h"
 #include "service/impala-server.h"
@@ -138,6 +138,8 @@ DECLARE_bool(ldap_passwords_in_clear_ok);
 DECLARE_int64(max_cookie_lifetime_s);
 DECLARE_string(ssl_minimum_version);
 DECLARE_string(ssl_cipher_list);
+DECLARE_string(trusted_domain);
+DECLARE_bool(trusted_domain_use_xff_header);
 
 static const char* DOC_FOLDER = "/www/";
 static const int DOC_FOLDER_LEN = strlen(DOC_FOLDER);
@@ -269,7 +271,8 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
     context_(nullptr),
     error_handler_(UrlHandler(
         bind<void>(&Webserver::ErrorHandler, this, _1, _2), "error.tmpl", false)),
-    use_cookies_(FLAGS_max_cookie_lifetime_s > 0) {
+    use_cookies_(FLAGS_max_cookie_lifetime_s > 0),
+    check_trusted_domain_(!FLAGS_trusted_domain.empty()) {
   http_address_ = MakeNetworkAddress(interface.empty() ? "0.0.0.0" : interface, port);
   Init();
 
@@ -291,6 +294,11 @@ Webserver::Webserver(const string& interface, const int port, MetricGroup* metri
     total_cookie_auth_failure_ =
         metrics->AddCounter("impala.webserver.total-cookie-auth-failure", 0);
   }
+  if (check_trusted_domain_
+      && (auth_mode_ == AuthMode::SPNEGO || auth_mode_ == AuthMode::LDAP)) {
+    total_trusted_domain_check_success_ =
+        metrics->AddCounter("impala.webserver.total-trusted-domain-check-success", 0);
+  }
 }
 
 Webserver::~Webserver() {
@@ -597,6 +605,26 @@ sq_callback_result_t Webserver::BeginRequestCallback(struct sq_connection* conne
     }
   }
 
+  // Connections originating from trusted domains should not require authentication.
+  // Returns a cookie on the first successful auth attempt. This check is performed after
+  // checking for cookie to avoid subsequent reverse DNS lookups which can be
+  // unpredictably costly.
+  if (!authenticated && check_trusted_domain_) {
+    const char* xff_origin = sq_get_header(connection, "X-Forwarded-For");
+    string xff_origin_string = !xff_origin ? "" : string(xff_origin);
+    string origin = FLAGS_trusted_domain_use_xff_header ?
+        xff_origin_string :
+        GetRemoteAddress(request_info).ToString();
+    StripWhiteSpace(&origin);
+    if (!origin.empty()) {
+      if (TrustedDomainCheck(origin, connection, request_info)) {
+        total_trusted_domain_check_success_->Increment(1);
+        authenticated = true;
+        AddCookie(request_info, &response_headers);
+      }
+    }
+  }
+
   if (!authenticated) {
     if (auth_mode_ == AuthMode::SPNEGO) {
       sq_callback_result_t spnego_result =
@@ -764,6 +792,31 @@ sq_callback_result_t Webserver::HandleSpnego(struct sq_connection* connection,
   return SQ_CONTINUE_HANDLING;
 }
 
+bool Webserver::TrustedDomainCheck(const string& origin, struct sq_connection* connection,
+    struct sq_request_info* request_info) {
+  if (!IsTrustedDomain(origin, FLAGS_trusted_domain)) return false;
+  const char* authz_header = sq_get_header(connection, "Authorization");
+  if (!authz_header) {
+    LOG(ERROR) << "Passed TrustedDomainCheck but no Authorization header provided.";
+    return false;
+  }
+
+  string base64;
+  if (!TryStripPrefixString(authz_header, "Basic ", &base64)) {
+    LOG(ERROR) << "Passed TrustedDomainCheck but No Basic authentication provided.";
+    return false;
+  }
+  string username, password;
+  Status status = BasicAuthExtractCredentials(base64, username, password);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error parsing basic authentication token from: "
+               << GetRemoteAddress(request_info).ToString() << " Error: " << status;
+    return false;
+  }
+  request_info->remote_user = strdup(username.c_str());
+  return true;
+}
+
 Status Webserver::HandleBasic(struct sq_connection* connection,
     struct sq_request_info* request_info, vector<string>* response_headers) {
   const char* authz_header = sq_get_header(connection, "Authorization");
@@ -775,18 +828,13 @@ Status Webserver::HandleBasic(struct sq_connection* connection,
   if (!TryStripPrefixString(authz_header, "Basic ", &base64)) {
     return Status::Expected("No Basic authentication provided.");
   }
-
-  string decoded;
-  if (!Base64Unescape(base64, &decoded)) {
-    return Status::Expected("Failed to decode base64 basic authentication token.");
-  }
-
-  std::size_t colon = decoded.find(':');
-  if (colon == std::string::npos) {
-    return Status::Expected("Invalid basic authentication token format, missing ':'.");
+  string username, password;
+  Status status = BasicAuthExtractCredentials(base64, username, password);
+  if (!status.ok()) {
+    LOG(ERROR) << "Error parsing basic authentication token from: "
+               << GetRemoteAddress(request_info).ToString() << " Error: " << status;
+    return status;
   }
-  string username = decoded.substr(0, colon);
-  string password = decoded.substr(colon + 1);
   if (ldap_->LdapCheckPass(username.c_str(), password.c_str(), password.length())
       && ldap_->LdapCheckFilters(username)) {
     request_info->remote_user = strdup(username.c_str());
diff --git a/be/src/util/webserver.h b/be/src/util/webserver.h
index a4981f3..96698c1 100644
--- a/be/src/util/webserver.h
+++ b/be/src/util/webserver.h
@@ -196,6 +196,11 @@ class Webserver {
   sq_callback_result_t HandleSpnego(struct sq_connection* connection,
       struct sq_request_info* request_info, std::vector<std::string>* response_headers);
 
+  /// Checks and returns true if the connection originated from a trusted domain and has a
+  /// valid username set in the request's the Authorization header (using Basic Auth).
+  bool TrustedDomainCheck(const std::string& origin, struct sq_connection* connection,
+      struct sq_request_info* request_info);
+
   // Handle Basic authentication for this request. Returns an error if authentication was
   // unsuccessful.
   Status HandleBasic(struct sq_connection* connection,
@@ -258,6 +263,10 @@ class Webserver {
   /// If true and SPNEGO is in use, cookies will be used for authentication.
   bool use_cookies_;
 
+  /// If true and SPNEGO or LDAP is in use, checks whether an incoming connection can skip
+  /// auth if it originates from a trusted domain.
+  bool check_trusted_domain_;
+
   /// Used to validate usernames/passwords If LDAP authentication is in use.
   std::unique_ptr<ImpalaLdap> ldap_;
 
@@ -275,6 +284,10 @@ class Webserver {
   /// auth attempts.
   IntCounter* total_cookie_auth_success_ = nullptr;
   IntCounter* total_cookie_auth_failure_ = nullptr;
+
+  /// If 'use_cookies_' is true, metrics for the number of successful
+  /// attempts to authorize connections originating from a trusted domain.
+  IntCounter* total_trusted_domain_check_success_ = nullptr;
 };
 
 }
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 69029db..560a494 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -1282,6 +1282,16 @@
     "key": "impala.thrift-server.hiveserver2-http-frontend.total-cookie-auth-failure"
   },
   {
+    "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that skipped authentication as they originated from a trusted domain.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "HiveServer2 HTTP API Connection Trusted Domain Check Success",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.thrift-server.hiveserver2-http-frontend.total-trusted-domain-check-success"
+  },
+  {
     "description": "The number of HiveServer2 HTTP API connection requests to this Impala Daemon that were successfully authenticated with Kerberos",
     "contexts": [
       "IMPALAD"
@@ -2713,6 +2723,18 @@
     "kind": "COUNTER",
     "key": "impala.webserver.total-cookie-auth-failure"
   },
+    {
+    "description": "The number of HTTP connection requests to this daemon's webserver that originated from a trusted domain.",
+    "contexts": [
+      "IMPALAD",
+      "CATALOGSERVER",
+      "STATESTORE"
+    ],
+    "label": "Webserver HTTP Connection Trusted Domain Check Success",
+    "units": "NONE",
+    "kind": "COUNTER",
+    "key": "impala.webserver.total-trusted-domain-check-success"
+  },
   {
     "description": "The number of times the FAIL debug action returned an error. For testing only.",
     "contexts": [
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
index c170b8e..91950aa 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapHS2Test.java
@@ -90,8 +90,8 @@ public class LdapHS2Test {
     TFetchResultsResp fetchResp = client.FetchResults(fetchReq);
     verifySuccess(fetchResp.getStatus());
     List<TColumn> columns = fetchResp.getResults().getColumns();
-    assertEquals(columns.size(), 1);
-    assertEquals(columns.get(0).getStringVal().getValues().get(0), expectedResult);
+    assertEquals(1, columns.size());
+    assertEquals(expectedResult, columns.get(0).getStringVal().getValues().get(0));
 
     return execResp.getOperationHandle();
   }
@@ -116,6 +116,13 @@ public class LdapHS2Test {
     assertEquals(expectedCookieAuthFailure, actualCookieAuthFailure);
   }
 
+  private void verifyTrustedDomainMetrics(long expectedAuthSuccess) throws Exception {
+    long actualAuthSuccess = (long) metrics
+        .getMetric("impala.thrift-server.hiveserver2-http-frontend."
+            + "total-trusted-domain-check-success");
+    assertEquals(expectedAuthSuccess, actualAuthSuccess);
+  }
+
   /**
    * Tests LDAP authentication to the HTTP hiveserver2 endpoint.
    */
@@ -304,4 +311,97 @@ public class LdapHS2Test {
     openResp = client.OpenSession(openReq);
     assertEquals(openResp.getStatus().getStatusCode(), TStatusCode.ERROR_STATUS);
   }
+
+  /**
+   * Tests if authentication is skipped when connections to the HTTP hiveserver2
+   * endpoint originate from a trusted domain.
+   */
+  @Test
+  public void testHiveserver2TrustedDomainAuth() throws Exception {
+    setUp("--trusted_domain=localhost --trusted_domain_use_xff_header=true");
+    verifyMetrics(0, 0);
+    THttpClient transport = new THttpClient("http://localhost:28000");
+    Map<String, String> headers = new HashMap<String, String>();
+
+    // Case 1: Authenticate as 'Test1Ldap' with the right password '12345'
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    transport.open();
+    TCLIService.Iface client = new TCLIService.Client(new TBinaryProtocol(transport));
+
+    // Open a session which will get username 'Test1Ldap'.
+    TOpenSessionReq openReq = new TOpenSessionReq();
+    TOpenSessionResp openResp = client.OpenSession(openReq);
+    // One successful authentication.
+    verifyMetrics(0, 0);
+    verifyTrustedDomainMetrics(1);
+    // Running a query should succeed.
+    TOperationHandle operationHandle = execAndFetch(
+        client, openResp.getSessionHandle(), "select logged_in_user()", "Test1Ldap");
+    // Two more successful authentications - for the Exec() and the Fetch().
+    verifyMetrics(0, 0);
+    verifyTrustedDomainMetrics(3);
+
+    // Case 2: Authenticate as 'Test1Ldap' without password
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOg==");
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    openResp = client.OpenSession(openReq);
+    verifyMetrics(0, 0);
+    verifyTrustedDomainMetrics(4);
+    operationHandle = execAndFetch(client, openResp.getSessionHandle(),
+        "select logged_in_user()", "Test1Ldap");
+    verifyMetrics(0, 0);
+    verifyTrustedDomainMetrics(6);
+
+    // Case 3: Case 1: Authenticate as 'Test1Ldap' with the right password
+    // '12345' but with a non trusted address in X-Forwarded-For header
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    headers.put("X-Forwarded-For", "127.23.0.1");
+    transport.setCustomHeaders(headers);
+    openResp = client.OpenSession(openReq);
+    verifyMetrics(1, 0);
+    verifyTrustedDomainMetrics(6);
+    operationHandle = execAndFetch(client, openResp.getSessionHandle(),
+        "select logged_in_user()", "Test1Ldap");
+    verifyMetrics(3, 0);
+    verifyTrustedDomainMetrics(6);
+
+    // Case 4: No auth header, does not work
+    headers.remove("Authorization");
+    headers.put("X-Forwarded-For", "127.0.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      verifyTrustedDomainMetrics(6);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+
+    // Case 5: Case 1: Authenticate as 'Test1Ldap' with the no password
+    // and a non trusted address in X-Forwarded-For header
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOg==");
+    headers.put("X-Forwarded-For", "127.23.0.1");
+    transport.setCustomHeaders(headers);
+    try {
+      openResp = client.OpenSession(openReq);
+      fail("Exception exception.");
+    } catch (Exception e) {
+      verifyMetrics(3, 1);
+      verifyTrustedDomainMetrics(6);
+      assertEquals(e.getMessage(), "HTTP Response code: 401");
+    }
+
+    // Case 6: Verify that there are no changes in metrics for trusted domain
+    // check if the X-Forwarded-For header is not present
+    headers.put("Authorization", "Basic VGVzdDFMZGFwOjEyMzQ1");
+    headers.remove("X-Forwarded-For");
+    transport.setCustomHeaders(headers);
+    openResp = client.OpenSession(openReq);
+    // Account for 1 successful basic auth increment.
+    verifyMetrics(4, 1);
+    verifyTrustedDomainMetrics(6);
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
index 3d7ac73..6a8e2a7 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapWebserverTest.java
@@ -21,9 +21,10 @@ import static org.apache.impala.testutil.LdapUtil.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -37,7 +38,6 @@ import org.apache.directory.server.core.integ.CreateLdapServerRule;
 import org.apache.impala.util.Metrics;
 import org.apache.log4j.Logger;
 import org.junit.After;
-import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
@@ -97,6 +97,13 @@ public class LdapWebserverTest {
         expectedCookieFailure.contains(actualCookieFailure));
   }
 
+  private void verifyTrustedDomainMetrics(Range<Long> expectedSuccess) throws Exception {
+    long actualSuccess = (long) metrics_
+        .getMetric("impala.webserver.total-trusted-domain-check-success");
+    assertTrue("Expected: " + expectedSuccess + ", Actual: " + actualSuccess,
+        expectedSuccess.contains(actualSuccess));
+  }
+
   @Test
   public void testWebserver() throws Exception {
     setUp("", "");
@@ -194,4 +201,61 @@ public class LdapWebserverTest {
       assertTrue(result, result.contains("No URI handler for"));
     }
   }
+
+  @Test
+  public void testWebserverTrustedDomain() throws Exception {
+    setUp("--trusted_domain=localhost --trusted_domain_use_xff_header=true", "");
+
+    // Case 1: Authenticate as 'Test1Ldap' with the right password '12345'
+    attemptConnection("Basic VGVzdDFMZGFwOjEyMzQ1", "127.0.0.1");
+    verifyTrustedDomainMetrics(Range.closed(1L, 1L));
+
+    // Case 2: Authenticate as 'Test1Ldap' without password
+    attemptConnection("Basic VGVzdDFMZGFwOg==", "127.0.0.1");
+    verifyTrustedDomainMetrics(Range.closed(2L, 2L));
+
+    // Case 3: Authenticate as 'Test1Ldap' with the right password
+    // '12345' but with a non trusted address in X-Forwarded-For header
+    attemptConnection("Basic VGVzdDFMZGFwOjEyMzQ1", "127.0.23.1");
+    verifyTrustedDomainMetrics(Range.closed(2L, 2L));
+
+    // Case 4: No auth header, does not work
+    try {
+      attemptConnection(null, "127.0.0.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    verifyTrustedDomainMetrics(Range.closed(2L, 2L));
+
+    // Case 5: Authenticate as 'Test1Ldap' with the no password
+    // and a non trusted address in X-Forwarded-For header
+    try {
+      attemptConnection("Basic VGVzdDFMZGFwOg==", "127.0.23.1");
+    } catch (IOException e) {
+      assertTrue(e.getMessage().contains("Server returned HTTP response code: 401"));
+    }
+    verifyTrustedDomainMetrics(Range.closed(2L, 2L));
+
+    // Case 6: Verify that there are no changes in metrics for trusted domain
+    // check if the X-Forwarded-For header is not present
+    long successMetricBefore = (long) metrics_
+        .getMetric("impala.webserver.total-trusted-domain-check-success");
+    attemptConnection("Basic VGVzdDFMZGFwOjEyMzQ1", null);
+    verifyTrustedDomainMetrics(Range.closed(successMetricBefore, successMetricBefore));
+  }
+
+  // Helper method to make a get call to the webserver using the input basic
+  // auth token and x-forward-for token.
+  private void attemptConnection(String basic_auth_token, String xff_address)
+      throws Exception {
+    String url = "http://localhost:25000/?json";
+    URLConnection connection = new URL(url).openConnection();
+    if (basic_auth_token != null) {
+      connection.setRequestProperty("Authorization", basic_auth_token);
+    }
+    if (xff_address != null) {
+      connection.setRequestProperty("X-Forwarded-For", xff_address);
+    }
+    connection.getInputStream();
+  }
 }


[impala] 04/04: IMPALA-10256: Skip test_disable_incremental_metadata_updates on S3 tests

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3ba8d637cdf38a68e25e573afa8d1d05047df2f6
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Tue Oct 20 10:28:11 2020 +0800

    IMPALA-10256: Skip test_disable_incremental_metadata_updates on S3 tests
    
    IMPALA-10113 adds a test for disabling the incremental_metadata_updates
    flag to verify the metadata propagation still working correctly. The
    test invokes two test files which is used in metadata/test_ddl.py. One
    test file is about hdfs caching. It should only be run on HDFS file
    system. So we should mark the test with "SkipIf.not_hdfs".
    
    Tests:
     - Run CORE test on S3 build.
    
    Change-Id: I0b922de84cff0a1e0771d5a8470bdd9f153f85f0
    Reviewed-on: http://gerrit.cloudera.org:8080/16616
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 tests/custom_cluster/test_disable_features.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/tests/custom_cluster/test_disable_features.py b/tests/custom_cluster/test_disable_features.py
index 1434711..a8d2c0b 100644
--- a/tests/custom_cluster/test_disable_features.py
+++ b/tests/custom_cluster/test_disable_features.py
@@ -19,6 +19,7 @@ import pytest
 
 from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
 from tests.common.parametrize import UniqueDatabase
+from tests.common.skip import SkipIf
 
 
 class TestDisableFeatures(CustomClusterTestSuite):
@@ -33,6 +34,7 @@ class TestDisableFeatures(CustomClusterTestSuite):
   def test_disable_orc_scanner(self, vector):
     self.run_test_case('QueryTest/disable-orc-scanner', vector)
 
+  @SkipIf.not_hdfs
   @pytest.mark.execute_serially
   @UniqueDatabase.parametrize(sync_ddl=True)
   @CustomClusterTestSuite.with_args(


[impala] 03/04: IMPALA-9870: impala-shell 'summary' to show original and retried queries

Posted by st...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 2a1d3acaf19345707a0e2bc9d73e0c1c9204b48b
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Thu Sep 24 15:08:05 2020 +0800

    IMPALA-9870: impala-shell 'summary' to show original and retried queries
    
    This patch extends the 'summary' command of impala-shell to support
    retrieving the summary of the original query attempt. The new syntax is
    
    SUMMARY [ALL | LATEST | ORIGINAL]
    
    If 'ALL' is specified, both the latest and original summaries are
    printed. If 'LATEST' is specified, only the summary of the latest query
    attempt is printed. If 'ORIGINAL' is specified, only the summary of the
    original query attempt is printed. The default option is 'LATEST'.
    Support for this has only been added to HS2 given that Beeswax is being
    deprecated soon.
    
    Tests:
     - Add new tests in test_shell_interactive.py
    
    Change-Id: I8605dd0eb2d3a2f64f154afb6c2fd34251c1fec2
    Reviewed-on: http://gerrit.cloudera.org:8080/16502
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/service/impala-hs2-server.cc            |  9 +++-
 be/src/service/impala-server.cc                | 34 ++++++++++++-
 be/src/service/impala-server.h                 |  8 ++-
 common/thrift/ImpalaService.thrift             | 11 +++-
 shell/impala_client.py                         | 22 +++++---
 shell/impala_shell.py                          | 70 ++++++++++++++++++--------
 tests/custom_cluster/test_shell_interactive.py | 23 ++++++++-
 7 files changed, 144 insertions(+), 33 deletions(-)

diff --git a/be/src/service/impala-hs2-server.cc b/be/src/service/impala-hs2-server.cc
index 43f01a1..5c618fb 100644
--- a/be/src/service/impala-hs2-server.cc
+++ b/be/src/service/impala-hs2-server.cc
@@ -1017,8 +1017,15 @@ void ImpalaServer::GetExecSummary(TGetExecSummaryResp& return_val,
       SQLSTATE_GENERAL_ERROR);
 
   TExecSummary summary;
-  Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &summary);
+  TExecSummary original_summary;
+  bool was_retried = false;
+  Status status = GetExecSummary(query_id, GetEffectiveUser(*session), &summary,
+      &original_summary, &was_retried);
   HS2_RETURN_IF_ERROR(return_val, status, SQLSTATE_GENERAL_ERROR);
+  if (request.include_query_attempts && was_retried) {
+    return_val.failed_summaries.emplace_back(original_summary);
+    return_val.__isset.failed_summaries = true;
+  }
   return_val.__set_summary(summary);
   return_val.status.__set_statusCode(thrift::TStatusCode::SUCCESS_STATUS);
 }
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ad0c3fc..b19ebae 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -813,7 +813,8 @@ Status ImpalaServer::DecompressToProfile(TRuntimeProfileFormat::type format,
 }
 
 Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& user,
-    TExecSummary* result) {
+    TExecSummary* result, TExecSummary* original_result, bool* was_retried) {
+  if (was_retried != nullptr) *was_retried = false;
   // Search for the query id in the active query map.
   {
     // QueryHandle of the current query.
@@ -858,6 +859,17 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
         result->error_logs.push_back(Substitute("Retrying query using query id: $0",
             PrintId(query_handle->query_id())));
         result->__isset.error_logs = true;
+        if (was_retried != nullptr) {
+          *was_retried = true;
+          DCHECK(original_result != nullptr);
+          // The original query could not in PENDING state because it already fails.
+          // Handle the other two cases as above.
+          if (original_query_handle->GetCoordinator() != nullptr) {
+            original_query_handle->GetCoordinator()->GetTExecSummary(original_result);
+          } else {
+            *original_result = TExecSummary();
+          }
+        }
       }
       return Status::OK();
     }
@@ -870,6 +882,7 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
     bool user_has_profile_access = false;
     bool is_query_missing = false;
     TExecSummary exec_summary;
+    TExecSummary retried_exec_summary;
     {
       lock_guard<mutex> l(query_log_lock_);
       QueryLogIndex::const_iterator query_record = query_log_index_.find(query_id);
@@ -878,6 +891,16 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
         effective_user = query_record->second->effective_user;
         user_has_profile_access = query_record->second->user_has_profile_access;
         exec_summary = query_record->second->exec_summary;
+        if (query_record->second->was_retried) {
+          if (was_retried != nullptr) *was_retried = true;
+          DCHECK(query_record->second->retried_query_id != nullptr);
+          QueryLogIndex::const_iterator retried_query_record =
+              query_log_index_.find(*query_record->second->retried_query_id);
+          // The retried query ran later than the original query. We should be able to
+          // find it in the query log since we have found the original query.
+          DCHECK(retried_query_record != query_log_index_.end());
+          retried_exec_summary = retried_query_record->second->exec_summary;
+        }
       }
     }
     if (is_query_missing) {
@@ -888,7 +911,14 @@ Status ImpalaServer::GetExecSummary(const TUniqueId& query_id, const string& use
       return Status::Expected(err);
     }
     RETURN_IF_ERROR(CheckProfileAccess(user, effective_user, user_has_profile_access));
-    *result = exec_summary;
+    if (was_retried != nullptr && *was_retried) {
+      DCHECK(original_result != nullptr);
+      // 'result' returns the latest summary so it's the retried one.
+      *result = retried_exec_summary;
+      *original_result = exec_summary;
+    } else {
+      *result = exec_summary;
+    }
   }
   return Status::OK();
 }
diff --git a/be/src/service/impala-server.h b/be/src/service/impala-server.h
index 03b0612..2347030 100644
--- a/be/src/service/impala-server.h
+++ b/be/src/service/impala-server.h
@@ -786,8 +786,14 @@ class ImpalaServer : public ImpalaServiceIf,
   /// summary is the same user that run the query and that user has access to the full
   /// query profile. Otherwise, an error status is returned to indicate an
   /// authorization error.
+  /// If 'original_result' and 'was_retried' are not null pointers, returns whether the
+  /// query is retried in '*was_retried'. If the query is retried, returns the exec
+  /// summary of the original query in '*original_result'. '*original_result' won't be
+  /// set if the query is not retried. 'original_result' and 'was_retried' should be both
+  /// valid pointers when any of them is used.
   Status GetExecSummary(const TUniqueId& query_id, const std::string& user,
-      TExecSummary* result) WARN_UNUSED_RESULT;
+      TExecSummary* result, TExecSummary* original_result = nullptr,
+      bool* was_retried = nullptr) WARN_UNUSED_RESULT;
 
   /// Collect ExecSummary and update it to the profile in request_state
   void UpdateExecSummary(const QueryHandle& query_handle) const;
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 7a18523..b71da24 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -708,12 +708,21 @@ struct TGetExecSummaryReq {
   1: optional TCLIService.TOperationHandle operationHandle
 
   2: optional TCLIService.TSessionHandle sessionHandle
+
+  // If true, returns the summaries of all query attempts. A TGetExecSummaryResp
+  // always returns the profile for the most recent query attempt, regardless of the
+  // query id specified. Clients should set this to true if they want to retrieve the
+  // summaries of all query attempts (including the failed ones).
+  3: optional bool include_query_attempts = false
 }
 
 struct TGetExecSummaryResp {
   1: required TCLIService.TStatus status
 
   2: optional ExecStats.TExecSummary summary
+
+  // A list of all summaries of the failed query attempts.
+  3: optional list<ExecStats.TExecSummary> failed_summaries
 }
 
 struct TGetRuntimeProfileReq {
@@ -724,7 +733,7 @@ struct TGetRuntimeProfileReq {
   3: optional RuntimeProfile.TRuntimeProfileFormat format =
       RuntimeProfile.TRuntimeProfileFormat.STRING
 
-  // If true, returns the profiles of all query attempts. A TGetRuntimeProfileReq
+  // If true, returns the profiles of all query attempts. A TGetRuntimeProfileResp
   // always returns the profile for the most recent query attempt, regardless of the
   // query id specified. Clients should set this to true if they want to retrieve the
   // profiles of all query attempts (including the failed ones).
diff --git a/shell/impala_client.py b/shell/impala_client.py
index 684ef68..f8a7e4f 100755
--- a/shell/impala_client.py
+++ b/shell/impala_client.py
@@ -303,15 +303,19 @@ class ImpalaClient(object):
   def get_runtime_profile(self, last_query_handle):
     """Get the runtime profile string from the server. Returns None if
     an error was encountered. If the query was retried, returns the profile of the failed
-    profile as well; the tuple (profile, failed_profile) is returned where 'profile' is
+    attempt as well; the tuple (profile, failed_profile) is returned where 'profile' is
     the profile of the most recent query attempt and 'failed_profile' is the profile of
-    the original query attempt that failed.. Currently, only the HS2 protocol supports
+    the original query attempt that failed. Currently, only the HS2 protocol supports
     returning the failed profile."""
     raise NotImplementedError()
 
   def get_summary(self, last_query_handle):
     """Get the thrift TExecSummary from the server. Returns None if
-    an error was encountered."""
+    an error was encountered. If the query was retried, returns TExecSummary of the failed
+    attempt as well; the tuple (summary, failed_summary) is returned where 'summary' is
+    the TExecSummary of the most recent query attempt and 'failed_summary' is the
+    TExecSummary of the original query attempt that failed. Currently, only the HS2
+    protocol supports returning the failed summary"""
     raise NotImplementedError()
 
   def _get_warn_or_error_log(self, last_query_handle, warn):
@@ -882,14 +886,18 @@ class ImpalaHS2Client(ImpalaClient):
     return resp.profile, failed_profile
 
   def get_summary(self, last_query_handle):
-    req = TGetExecSummaryReq(last_query_handle, self.session_handle)
+    req = TGetExecSummaryReq(last_query_handle, self.session_handle,
+        include_query_attempts=True)
 
     def GetExecSummary():
       return self.imp_service.GetExecSummary(req)
     # GetExecSummary rpc is idempotent and so safe to retry.
     resp = self._do_hs2_rpc(GetExecSummary, retry_on_error=True)
     self._check_hs2_rpc_status(resp.status)
-    return resp.summary
+    failed_summary = None
+    if resp.failed_summaries and len(resp.failed_summaries) >= 1:
+      failed_summary = resp.failed_summaries[0]
+    return resp.summary, failed_summary
 
   def get_column_names(self, last_query_handle):
     # The handle has the schema embedded in it.
@@ -1154,8 +1162,8 @@ class ImpalaBeeswaxClient(ImpalaClient):
     summary, rpc_status = self._do_beeswax_rpc(
       lambda: self.imp_service.GetExecSummary(last_query_handle))
     if rpc_status == RpcStatus.OK and summary:
-      return summary
-    return None
+      return summary, None
+    return None, None
 
   def get_column_names(self, last_query_handle):
     # Note: the code originally ignored the RPC status. don't mess with it.
diff --git a/shell/impala_shell.py b/shell/impala_shell.py
index 3d1d810..cf06caf 100755
--- a/shell/impala_shell.py
+++ b/shell/impala_shell.py
@@ -103,13 +103,13 @@ class QueryOptionDisplayModes:
   ALL_OPTIONS = 2
 
 
-class QueryProfileDisplayModes:
-  """The display mode when runtime profiles are printed to the console. If the query has
-  not been retried, then the display mode does not change anything. The format is always
-  the same. If the query has been retried, then the ALL option will print both the
-  original and retried profiles. If the LATEST option is specified, then only the retried
-  profile will be printed. If the ORIGINAL option is specified, then only the original
-  profile will be printed."""
+class QueryAttemptDisplayModes:
+  """The display mode when runtime profiles or summaries are printed to the console.
+  If the query has not been retried, then the display mode does not change anything.
+  The format is always the same. If the query has been retried, then the ALL option will
+  print both the original and retried profiles/summaries. If the LATEST option is
+  specified, then only the retried profile/summary will be printed. If the ORIGINAL option
+  is specified, then only the original profile/summary will be printed."""
   ALL = "all"
   LATEST = "latest"
   ORIGINAL = "original"
@@ -692,13 +692,22 @@ class ImpalaShell(cmd.Cmd, object):
     return status
 
   def do_summary(self, args):
+    split_args = args.split()
+    if len(split_args) > 1:
+      print("'summary' only accepts 0 or 1 arguments", file=sys.stderr)
+      return CmdStatus.ERROR
     if not self.last_query_handle:
       print("Could not retrieve summary: no previous query.", file=sys.stderr)
       return CmdStatus.ERROR
 
-    summary = None
+    display_mode = QueryAttemptDisplayModes.LATEST
+    if len(split_args) == 1:
+      display_mode = self.get_query_attempt_display_mode(split_args[0])
+      if display_mode is None:
+        return CmdStatus.ERROR
+
     try:
-      summary = self.imp_client.get_summary(self.last_query_handle)
+      summary, failed_summary = self.imp_client.get_summary(self.last_query_handle)
     except RPCException as e:
       import re
       error_pattern = re.compile("ERROR: Query id \d+:\d+ not found.")
@@ -710,6 +719,30 @@ class ImpalaShell(cmd.Cmd, object):
     if summary.nodes is None:
       print("Summary not available", file=sys.stderr)
       return CmdStatus.SUCCESS
+
+    if display_mode == QueryAttemptDisplayModes.ALL:
+      print("Query Summary:")
+      self.print_exec_summary(summary)
+      if failed_summary:
+        print("Failed Query Summary:")
+        self.print_exec_summary(failed_summary)
+    elif display_mode == QueryAttemptDisplayModes.LATEST:
+      self.print_exec_summary(summary)
+    elif display_mode == QueryAttemptDisplayModes.ORIGINAL:
+      self.print_exec_summary(failed_summary)
+    else:
+      raise FatalShellException("Invalid value for query summary display mode")
+
+  @staticmethod
+  def get_query_attempt_display_mode(arg_mode):
+    arg_mode = str(arg_mode).lower()
+    if arg_mode not in [QueryAttemptDisplayModes.ALL,
+        QueryAttemptDisplayModes.LATEST, QueryAttemptDisplayModes.ORIGINAL]:
+      print("Invalid value for query attempt display mode: \'" +
+          arg_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
+    return arg_mode
+
+  def print_exec_summary(self, summary):
     output = []
     table = self._default_summary_table()
     self.imp_client.build_summary_table(summary, 0, False, 0, False, output)
@@ -994,7 +1027,7 @@ class ImpalaShell(cmd.Cmd, object):
       file_descriptor.flush()
 
   def print_runtime_profile(self, profile, failed_profile,
-          profile_display_mode=QueryProfileDisplayModes.LATEST, status=False):
+        profile_display_mode=QueryAttemptDisplayModes.LATEST, status=False):
     """Prints the given runtime profiles to the console. Optionally prints the failed
     profile if the query was retried. The format the profiles are printed is controlled
     by the option profile_display_mode, see QueryProfileDisplayModes docs above.
@@ -1002,13 +1035,13 @@ class ImpalaShell(cmd.Cmd, object):
     if self.show_profiles or status:
       if profile:
         query_profile_prefix = "Query Runtime Profile:\n"
-        if profile_display_mode == QueryProfileDisplayModes.ALL:
+        if profile_display_mode == QueryAttemptDisplayModes.ALL:
           print(query_profile_prefix + profile)
           if failed_profile:
             print("Failed Query Runtime Profile(s):\n" + failed_profile)
-        elif profile_display_mode == QueryProfileDisplayModes.LATEST:
+        elif profile_display_mode == QueryAttemptDisplayModes.LATEST:
           print(query_profile_prefix + profile)
-        elif profile_display_mode == QueryProfileDisplayModes.ORIGINAL:
+        elif profile_display_mode == QueryAttemptDisplayModes.ORIGINAL:
           print(query_profile_prefix + failed_profile if failed_profile else profile)
         else:
           raise FatalShellException("Invalid value for query profile display mode")
@@ -1063,13 +1096,10 @@ class ImpalaShell(cmd.Cmd, object):
       return CmdStatus.ERROR
 
     # Parse and validate the QueryProfileDisplayModes option.
-    profile_display_mode = QueryProfileDisplayModes.LATEST
+    profile_display_mode = QueryAttemptDisplayModes.LATEST
     if len(split_args) == 1:
-      profile_display_mode = str(split_args[0]).lower()
-      if profile_display_mode not in [QueryProfileDisplayModes.ALL,
-              QueryProfileDisplayModes.LATEST, QueryProfileDisplayModes.ORIGINAL]:
-        print("Invalid value for query profile display mode: \'" +
-                profile_display_mode + "\'. Valid values are [ALL | LATEST | ORIGINAL]")
+      profile_display_mode = self.get_query_attempt_display_mode(split_args[0])
+      if profile_display_mode is None:
         return CmdStatus.ERROR
 
     profile, failed_profile = self.imp_client.get_runtime_profile(
@@ -1119,7 +1149,7 @@ class ImpalaShell(cmd.Cmd, object):
 
     checkpoint = time.time()
     if checkpoint - self.last_summary > self.PROGRESS_UPDATE_INTERVAL:
-      summary = self.imp_client.get_summary(self.last_query_handle)
+      summary, failed_summary = self.imp_client.get_summary(self.last_query_handle)
       if not summary:
         return
 
diff --git a/tests/custom_cluster/test_shell_interactive.py b/tests/custom_cluster/test_shell_interactive.py
index e1e4616..361a126 100644
--- a/tests/custom_cluster/test_shell_interactive.py
+++ b/tests/custom_cluster/test_shell_interactive.py
@@ -63,7 +63,7 @@ class TestShellInteractive(CustomClusterTestSuite):
   _query_retry_options = "set retry_failed_queries=true;"
 
   @pytest.mark.execute_serially
-  def test_query_retries_profile_cmd(self):
+  def test_query_retries_profile_and_summary_cmd(self):
     """Tests transparent query retries via impala-shell. Validates the output of the
     'profile [all | latest | original];' commands in impala-shell."""
     query = "select count(*) from functional.alltypes where bool_col = sleep(50)"
@@ -100,6 +100,27 @@ class TestShellInteractive(CustomClusterTestSuite):
     self.__proc_not_expect(proc, "Failed Query Runtime Profile\(s\):")
     self.__proc_not_expect(proc, "Query State: FINISHED")
 
+    # Check the output of 'summary all'
+    proc.sendline("summary all;")
+    proc.expect("Query Summary:")
+    # The retried query runs on 2 instances.
+    proc.expect("00:SCAN HDFS\w*| 2\w*| 2")
+    proc.expect("Failed Query Summary:")
+    # The original query runs on 3 instances.
+    proc.expect("00:SCAN HDFS\w*| 3\w*| 3")
+
+    # Check the output of 'summary latest' and 'summary'. The output of both cmds
+    # should be equivalent.
+    for summary_cmd in ["summary latest;", "summary;"]:
+      proc.sendline(summary_cmd)
+      # The retried query runs on 2 instances.
+      proc.expect("00:SCAN HDFS\w*| 2\w*| 2")
+
+    # Check the output of 'summary original'
+    proc.sendline("summary original")
+    # The original query runs on 3 instances.
+    proc.expect("00:SCAN HDFS\w*| 3\w*| 3")
+
   @pytest.mark.execute_serially
   def test_query_retries_show_profiles(self):
     """Tests transparent query retries via impala-shell. Validates that the output of the