You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by db...@apache.org on 2022/08/04 17:32:13 UTC

[impala] 03/03: IMPALA-11469: Make prefix of ignored staging dirs configurable

This is an automated email from the ASF dual-hosted git repository.

dbecker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit abcb62b676539b85c7c428ed385177f591de3492
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Wed Aug 3 16:46:45 2022 +0800

    IMPALA-11469: Make prefix of ignored staging dirs configurable
    
    External systems like Hive or Spark will write temporary or "non-data"
    files in the table location. Catalogd will skip them when loading file
    metadata. However, the prefix is currently hard coded. We recently found
    that Spark streaming will generated a _spark_metadata dir which is not
    handled correctly.
    
    To avoid future code changes when interact with more systems, this patch
    adds a new startup flag, ignored_dir_prefix_list, for catalogd. It's a
    comma separated list for the prefix of ignored dirs. Currently, the
    default value is ".,_tmp.,_spark_metadata". Users can add more in the
    future.
    
    Tests:
     - Add a case for _spark_metadata in FileSystemUtilTest
    
    Change-Id: I108bfa823281a35d28932f7ccce0b12a0c5af57d
    Reviewed-on: http://gerrit.cloudera.org:8080/18811
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/util/backend-gflag-util.cc                  |  5 ++++
 common/thrift/BackendGflags.thrift                 |  2 ++
 .../org/apache/impala/common/FileSystemUtil.java   | 27 +++++++++++++++++-----
 .../org/apache/impala/service/BackendConfig.java   |  4 ++++
 .../apache/impala/common/FileSystemUtilTest.java   | 19 +++++++++++++--
 5 files changed, 49 insertions(+), 8 deletions(-)

diff --git a/be/src/util/backend-gflag-util.cc b/be/src/util/backend-gflag-util.cc
index 3e69c3dd6..9a8d17f1d 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -191,6 +191,10 @@ DEFINE_bool(use_hms_column_order_for_hbase_tables, false,
     "Use the column order in HMS for HBase tables instead of ordering the columns by "
     "family/qualifier. Keeping the default as false for backward compatibility.");
 
+DEFINE_string(ignored_dir_prefix_list, ".,_tmp.,_spark_metadata",
+    "Comma separated list to specify the prefix for tmp/staging dirs that catalogd should"
+    " skip in loading file metadata.");
+
 namespace impala {
 
 Status GetConfigFromCommand(const string& flag_cmd, string& result) {
@@ -330,6 +334,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
   cfg.__set_pull_table_types_and_comments(FLAGS_pull_table_types_and_comments);
   cfg.__set_use_hms_column_order_for_hbase_tables(
       FLAGS_use_hms_column_order_for_hbase_tables);
+  cfg.__set_ignored_dir_prefix_list(FLAGS_ignored_dir_prefix_list);
   return Status::OK();
 }
 
diff --git a/common/thrift/BackendGflags.thrift b/common/thrift/BackendGflags.thrift
index 9271beecf..a3317be89 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -227,4 +227,6 @@ struct TBackendGflags {
   101: required bool pull_table_types_and_comments
 
   102: required bool use_hms_column_order_for_hbase_tables
+
+  103: required string ignored_dir_prefix_list
 }
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index e3c56a6f4..d3e58c0eb 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsAdmin;
 import org.apache.hadoop.hdfs.protocol.EncryptionZone;
 import org.apache.impala.catalog.HdfsCompression;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.util.DebugUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Set;
@@ -846,12 +849,19 @@ public class FileSystemUtil {
   }
 
   /**
-   * Prefix string used by hive to write certain temporary or "non-data" files in the
-   * table location
+   * Prefix string used by tools like hive/spark/flink to write certain temporary or
+   * "non-data" files in the table location
    */
-  public static final String HIVE_TEMP_FILE_PREFIX = "_tmp.";
-
-  public static final String DOT = ".";
+  private static final List<String> TMP_DIR_PREFIX_LIST = new ArrayList<>();
+  static {
+    String s = BackendConfig.INSTANCE.getIgnoredDirPrefixList();
+    for (String prefix : s.split(",")) {
+      if (!prefix.isEmpty()) {
+        TMP_DIR_PREFIX_LIST.add(prefix);
+      }
+    }
+    LOG.info("Prefix list of ignored dirs: " + TMP_DIR_PREFIX_LIST);
+  }
 
   /**
    * Util method used to filter out hidden and temporary staging directories
@@ -861,7 +871,12 @@ public class FileSystemUtil {
   @VisibleForTesting
   static boolean isIgnoredDir(Path path) {
     String filename = path.getName();
-    return filename.startsWith(DOT) || filename.startsWith(HIVE_TEMP_FILE_PREFIX);
+    for (String prefix : TMP_DIR_PREFIX_LIST) {
+      if (filename.startsWith(prefix)) {
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 086a4c213..0261ec08e 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -363,4 +363,8 @@ public class BackendConfig {
   public boolean useHmsColumnOrderForHBaseTables() {
     return backendCfg_.use_hms_column_order_for_hbase_tables;
   }
+
+  public String getIgnoredDirPrefixList() {
+    return backendCfg_.ignored_dir_prefix_list;
+  }
 }
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
index adbefe9b0..dba2bf4c4 100644
--- a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.impala.common;
 
-import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX;
 import static org.apache.impala.common.FileSystemUtil.isIgnoredDir;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -26,6 +25,9 @@ import static org.junit.Assert.assertEquals;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.impala.service.BackendConfig;
+import org.apache.impala.thrift.TBackendGflags;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -38,10 +40,19 @@ import java.util.List;
  * Tests for the various util methods in FileSystemUtil class
  */
 public class FileSystemUtilTest {
-
+  private static final String HIVE_TEMP_FILE_PREFIX = "_tmp.";
+  private static final String SPARK_TEMP_FILE_PREFIX = "_spark_metadata";
   private static final Path TEST_TABLE_PATH = new Path("/test-warehouse/foo"
       + ".db/filesystem-util-test");
 
+  @Before
+  public void setUp()  {
+    // Make sure BackendConfig is initialized.
+    if (BackendConfig.INSTANCE == null) {
+      BackendConfig.create(new TBackendGflags());
+    }
+  }
+
   @Test
   public void testIsInIgnoredDirectory() {
     // test positive cases
@@ -60,6 +71,10 @@ public class FileSystemUtilTest {
         testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
             HIVE_TEMP_FILE_PREFIX + "delta_000000_2/test.manifest")));
 
+    assertTrue("Files in spark temporary directories should be ignored",
+        testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
+            SPARK_TEMP_FILE_PREFIX + "/0")));
+
     //multiple nested levels
     assertTrue(testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
         ".hive-staging/nested-1/nested-2/nested-3/tempfile")));