You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by kw...@apache.org on 2019/07/09 17:42:53 UTC

[impala] 03/03: IMPALA-8663 : FileMetadataLoader should skip hidden and tmp directories

This is an automated email from the ASF dual-hosted git repository.

kwho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fc974f944a9266e68e6f1694eecdc2160fd52582
Author: Vihang Karajgaonkar <vi...@cloudera.com>
AuthorDate: Mon Jun 17 13:27:03 2019 -0700

    IMPALA-8663 : FileMetadataLoader should skip hidden and tmp directories
    
    The FileMetadataLoader is used to load the file information in when the
    table is loaded. By default, it lists all the files in the
    table/partition directory. Currently, it only skips the filenames which
    are invalid (hidden files and ones starting with "_" etc). However, it
    does not skip the directories which are temporary or hidden. In case of
    Hive when data is inserted into a table, it creates a temporary staging
    directory which is a hidden directory under the table location. When the
    insert in hive is completed, such staging directories are removed. But
    if there is a refresh called during that time, FileMetadataLoader will
    add the files in the staging directory as well. Not only this could
    cause temporary invalid results but it causes table to go in a bad state
    when these temporary directories are removed. The only work-around in
    such a case to issue a refresh on the table again.
    
    This patch adds logic in the filemetadataloader to ignore such temporary
    staging directories. Unfortunately, hadoop does not provide a API which
    can recursively list files in a directory and skip certain directories.
    This patch adds a new FilterIterator which wraps around existing
    listFiles, listStatus and RecursingIterator to skip the hidden
    directories from the listing result.
    
    Also, the existing code to recover partitions implements its own
    recursion logic which includes path validation. This already skips such
    hidden directories since they do not conform to the partition spec. The
    patch does a minor modification to this method by directly calling the
    listStatusIterator instead of going through FileSystemUtil#listStatus
    whiche uses the filtering remote iterator now.
    
    Testing:
    1. Added a new tests as well as modified existing ones which were
    related to cover interesting cases.
    2. Ran concurrent inserts from Hive while issuing refresh in a loop on
    Impala side. Earlier this would cause the table to go into a bad state.
    Now, it works fine for the staging directories. It still runs into a
    FileNotFoundException from the impalad when there are insert overwrite
    statements in Hive
    
    Change-Id: I2c4a22908304fe9e377d77d6c18d401c3f3294aa
    Reviewed-on: http://gerrit.cloudera.org:8080/13665
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Vihang Karajgaonkar <vi...@cloudera.com>
---
 .../java/org/apache/impala/catalog/HdfsTable.java  |  4 +-
 .../org/apache/impala/common/FileSystemUtil.java   | 99 ++++++++++++++++++++--
 .../impala/catalog/FileMetadataLoaderTest.java     | 28 +++++-
 .../apache/impala/common/FileSystemUtilTest.java   | 97 +++++++++++++++++++++
 .../java/org/apache/impala/util/AcidUtilsTest.java |  3 +
 tests/metadata/test_recursive_listing.py           | 14 +++
 6 files changed, 236 insertions(+), 9 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 779a96e..0255e21 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1615,8 +1615,8 @@ public class HdfsTable extends Table implements FeFsTable {
       return;
     }
 
-    RemoteIterator<? extends FileStatus> statuses = FileSystemUtil.listStatus(fs, path,
-        /*recursive=*/false);
+    RemoteIterator <? extends FileStatus> statuses = fs.listStatusIterator(path);
+
     if (statuses == null) return;
     while (statuses.hasNext()) {
       FileStatus status = statuses.next();
diff --git a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
index 4c3f65b..f2de972 100644
--- a/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
+++ b/fe/src/main/java/org/apache/impala/common/FileSystemUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.impala.common;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.apache.commons.io.IOUtils;
@@ -25,7 +26,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.adl.AdlFileSystem;
@@ -564,13 +564,13 @@ public class FileSystemUtil {
         // even though it returns LocatedFileStatus objects with "fake" blocks which we
         // will ignore.
         if (isS3AFileSystem(fs)) {
-          return listFiles(fs, p, recursive);
+          return listFiles(fs, p, true);
         }
 
-        return new RecursingIterator(fs, p);
+        return new FilterIterator(p, new RecursingIterator(fs, p));
       }
 
-      return fs.listStatusIterator(p);
+      return new FilterIterator(p, fs.listStatusIterator(p));
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
       return null;
@@ -580,10 +580,10 @@ public class FileSystemUtil {
   /**
    * Wrapper around FileSystem.listFiles(), similar to the listStatus() wrapper above.
    */
-  public static RemoteIterator<LocatedFileStatus> listFiles(FileSystem fs, Path p,
+  public static RemoteIterator<? extends FileStatus> listFiles(FileSystem fs, Path p,
       boolean recursive) throws IOException {
     try {
-      return fs.listFiles(p, recursive);
+      return new FilterIterator(p, fs.listFiles(p, recursive));
     } catch (FileNotFoundException e) {
       if (LOG.isWarnEnabled()) LOG.warn("Path does not exist: " + p.toString(), e);
       return null;
@@ -612,6 +612,93 @@ public class FileSystemUtil {
   }
 
   /**
+   * Util method to check if the given file status relative to its parent is contained
+   * in a ignored directory. This is useful to ignore the files which seemingly are valid
+   * just by themselves but should still be ignored if they are contained in a
+   * directory which needs to be ignored
+   *
+   * @return true if the fileStatus should be ignored, false otherwise
+   */
+  @VisibleForTesting
+  static boolean isInIgnoredDirectory(Path parent, FileStatus fileStatus) {
+    Preconditions.checkNotNull(fileStatus);
+    Path currentPath = fileStatus.isDirectory() ? fileStatus.getPath() :
+        fileStatus.getPath().getParent();
+    while (currentPath != null && !currentPath.equals(parent)) {
+      if (isIgnoredDir(currentPath)) {
+        LOG.debug("Ignoring {} since it is either in a hidden directory or a temporary "
+                + "staging directory {}", fileStatus.getPath(), currentPath);
+        return true;
+      }
+      currentPath = currentPath.getParent();
+    }
+    return false;
+  }
+
+  /**
+   * Prefix string used by hive to write certain temporary or "non-data" files in the
+   * table location
+   */
+  public static final String HIVE_TEMP_FILE_PREFIX = "_tmp.";
+
+  public static final String DOT = ".";
+
+  /**
+   * Util method used to filter out hidden and temporary staging directories
+   * which tools like Hive create in the table/partition directories when a query is
+   * inserting data into them.
+   */
+  @VisibleForTesting
+  static boolean isIgnoredDir(Path path) {
+    String filename = path.getName();
+    return filename.startsWith(DOT) || filename.startsWith(HIVE_TEMP_FILE_PREFIX);
+  }
+
+  /**
+   * A remote iterator which takes in another Remote Iterator and a start path and filters
+   * all the ignored directories
+   * (See {@link org.apache.impala.common.FileSystemUtil#isInIgnoredDirectory}) from the
+   * listing of the remote iterator
+   */
+  static class FilterIterator implements RemoteIterator<FileStatus> {
+    private final RemoteIterator<? extends FileStatus> baseIterator_;
+    private FileStatus curFile_ = null;
+    private final Path startPath_;
+
+    FilterIterator(Path startPath, RemoteIterator<? extends FileStatus> baseIterator) {
+      startPath_ = Preconditions.checkNotNull(startPath);
+      baseIterator_ = Preconditions.checkNotNull(baseIterator);
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      // Pull the next file to be returned into 'curFile'. If we've already got one,
+      // we don't need to do anything (extra calls to hasNext() must not affect
+      // state)
+      while (curFile_ == null) {
+        if (!baseIterator_.hasNext()) return false;
+        // if the next fileStatus is in ignored directory skip it
+        FileStatus next = baseIterator_.next();
+        if (!isInIgnoredDirectory(startPath_, next)) {
+          curFile_ = next;
+          return true;
+        }
+      }
+      return true;
+    }
+
+    @Override
+    public FileStatus next() throws IOException {
+      if (hasNext()) {
+        FileStatus next = curFile_;
+        curFile_ = null;
+        return next;
+      }
+      throw new NoSuchElementException("No more entries");
+    }
+  }
+
+  /**
    * Iterator which recursively visits directories on a FileSystem, yielding
    * files in an unspecified order.
    */
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 9ade422..ffc0481 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.thrift.TNetworkAddress;
@@ -34,7 +35,6 @@ import org.junit.Test;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 
-
 public class FileMetadataLoaderTest {
 
   @Test
@@ -86,6 +86,32 @@ public class FileMetadataLoaderTest {
     }
   }
 
+  @Test
+  public void testSkipHiddenDirectories() throws IOException {
+    Path sourcePath = new Path("hdfs://localhost:20500/test-warehouse/alltypes/");
+    Path tmpTestPath = new Path("hdfs://localhost:20500/tmp/test-filemetadata-loader");
+    Configuration conf = new Configuration();
+    FileSystem dstFs = tmpTestPath.getFileSystem(conf);
+    FileSystem srcFs = sourcePath.getFileSystem(conf);
+    //copy the file-structure of a valid table
+    FileUtil.copy(srcFs, sourcePath, dstFs, tmpTestPath, false, true, conf);
+    dstFs.deleteOnExit(tmpTestPath);
+    // create a hidden directory similar to what hive does
+    Path hiveStaging = new Path(tmpTestPath, ".hive-staging_hive_2019-06-13_1234");
+    dstFs.mkdirs(hiveStaging);
+    Path manifestDir = new Path(tmpTestPath, "_tmp.base_0000007");
+    dstFs.mkdirs(manifestDir);
+    dstFs.createNewFile(new Path(manifestDir, "000000_0.manifest"));
+    dstFs.createNewFile(new Path(hiveStaging, "tmp-stats"));
+    dstFs.createNewFile(new Path(hiveStaging, ".hidden-tmp-stats"));
+
+    FileMetadataLoader fml = new FileMetadataLoader(tmpTestPath, true,
+        Collections.emptyList(), new ListMap <>(), null);
+    fml.load();
+    assertEquals(24, fml.getStats().loadedFiles);
+    assertEquals(24, fml.getLoadedFds().size());
+  }
+
   // TODO(todd) add unit tests for loading ACID tables once we have some ACID
   // tables with data loaded in the functional test DBs.
 }
diff --git a/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
new file mode 100644
index 0000000..030961a
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/common/FileSystemUtilTest.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.common;
+
+import static org.apache.impala.common.FileSystemUtil.HIVE_TEMP_FILE_PREFIX;
+import static org.apache.impala.common.FileSystemUtil.isIgnoredDir;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests for the various util methods in FileSystemUtil class
+ */
+public class FileSystemUtilTest {
+
+  private static final Path TEST_TABLE_PATH = new Path("/test-warehouse/foo"
+      + ".db/filesystem-util-test");
+
+  @Test
+  public void testIsInIgnoredDirectory() {
+    // test positive cases
+    assertTrue("Files in hive staging directory should be ignored",
+        testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH, "/part=1/"
+            + ".hive-staging/tempfile")));
+
+    assertTrue("Files in hidden directory ignored",
+        testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH, ".hidden/000000_0")));
+
+    assertTrue("Files in the hive temporary directories should be ignored",
+        testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
+            HIVE_TEMP_FILE_PREFIX + "base_0000000_1/000000_1.manifest")));
+
+    assertTrue("Files in hive temporary directories should be ignored",
+        testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
+            HIVE_TEMP_FILE_PREFIX + "delta_000000_2/test.manifest")));
+
+    //multiple nested levels
+    assertTrue(testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
+        ".hive-staging/nested-1/nested-2/nested-3/tempfile")));
+
+    // test negative cases
+    // table path should not ignored
+    assertFalse(testIsInIgnoredDirectory(TEST_TABLE_PATH));
+    assertFalse(
+        testIsInIgnoredDirectory(new Path("hdfs://localhost:20500" + TEST_TABLE_PATH)));
+    // partition path
+    assertFalse(testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH + "/part=1/000000")));
+    assertFalse(testIsInIgnoredDirectory(
+        new Path("hdfs://localhost:20500" + TEST_TABLE_PATH + "/part=1/00000")));
+    // nested directories for ACID tables should not be ignored
+    assertFalse(testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH, "/part=100"
+        + "/base_0000005/datafile")));
+    assertFalse(testIsInIgnoredDirectory(new Path(TEST_TABLE_PATH,
+        "/delta_0000001_0000002/deltafile")));
+
+  }
+
+  @Test
+  public void testIsIgnoredDir() {
+    assertTrue("Directory should be ignored if it starts with _tmp.",
+        isIgnoredDir(new Path(TEST_TABLE_PATH, HIVE_TEMP_FILE_PREFIX + "dummy")));
+    assertTrue("Directory should be ignored if its hidden",
+        isIgnoredDir(new Path(TEST_TABLE_PATH, ".hidden-dir")));
+    assertFalse(isIgnoredDir(TEST_TABLE_PATH));
+    assertFalse(isIgnoredDir(new Path(TEST_TABLE_PATH + "/part=100/datafile")));
+  }
+
+  private boolean testIsInIgnoredDirectory(Path input) {
+    return testIsInIgnoredDirectory(input, true);
+  }
+
+  private boolean testIsInIgnoredDirectory(Path input, boolean isDir) {
+    FileStatus mockFileStatus = Mockito.mock(FileStatus.class);
+    Mockito.when(mockFileStatus.getPath()).thenReturn(input);
+    Mockito.when(mockFileStatus.isDirectory()).thenReturn(isDir);
+    return FileSystemUtil.isInIgnoredDirectory(TEST_TABLE_PATH, mockFileStatus);
+  }
+}
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
index 25dc90d..86a47d7 100644
--- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -84,6 +84,7 @@ public class AcidUtilsTest {
           "base_0000005/abc.txt",
           "base_0000005/0000/",
           "base_0000005/0000/abc.txt",
+          "_tmp.base_0000005/000000_0.manifest",
           "abc/",
           "abc/base_0000006/", // Not at root, so shouldn't be handled.
           "base_00000100/def.txt"},
@@ -103,6 +104,7 @@ public class AcidUtilsTest {
           "base_0000005/abc.txt",
           "base_0000005/0000/",
           "base_0000005/0000/abc.txt",
+          "_tmp.base_0000005/000000_0.manifest",
           "delta_0000006_0000006_0000/",
           "delta_0000006_0000006_0000/000000_0",
           "delta_0000007_0000007_0000/",
@@ -129,6 +131,7 @@ public class AcidUtilsTest {
           "base_0000005/abc.txt",
           "base_0000005/0000/",
           "base_0000005/0000/abc.txt",
+          "_tmp.base_0000005/000000_0.manifest",
           "delta_0000006_0000006_0000/",
           "delta_0000006_0000006_0000/000000_0",
           "delta_0000007_0000007_0000/",
diff --git a/tests/metadata/test_recursive_listing.py b/tests/metadata/test_recursive_listing.py
index e569ec8..ab7abe8 100644
--- a/tests/metadata/test_recursive_listing.py
+++ b/tests/metadata/test_recursive_listing.py
@@ -92,6 +92,20 @@ class TestRecursiveListing(ImpalaTestSuite):
     assert len(self._show_files(fq_tbl_name)) == 3
     assert len(self._get_rows(fq_tbl_name)) == 3
 
+    # Create files in the nested hidden directories and refresh. Make sure it does not
+    # show up
+    self.filesystem_client.make_dir("{0}/.hive-staging".format(part_path[1:]))
+    self.filesystem_client.create_file(
+        "{0}/.hive-staging/file3.txt".format(part_path[1:]),
+        "data-should-be-ignored-by-impala")
+    self.filesystem_client.make_dir("{0}/_tmp.base_000000_1".format(part_path[1:]))
+    self.filesystem_client.create_file(
+        "{0}/_tmp.base_000000_1/000000_0.manifest".format(part_path[1:]),
+        "manifest-file_contents")
+    self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
+    assert len(self._show_files(fq_tbl_name)) == 3
+    assert len(self._get_rows(fq_tbl_name)) == 3
+
     # Test that disabling recursive listings makes the nested files disappear.
     self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties(" +
         "'impala.disable.recursive.listing'='true')").format(fq_tbl_name))