You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by lv...@apache.org on 2017/02/16 21:03:46 UTC

[5/5] incubator-impala git commit: IMPALA-4840: Fix REFRESH performance regression.

IMPALA-4840: Fix REFRESH performance regression.

The fix for IMPALA-4172 introduced a regression in
performance of the REFRESH command. The regression
stems from the fact that we reload the block metadata
of every valid data file without considering whether it
has changed since the last load. This caused unnecessary
metadata loads for unchanged files and thus increasing
the runtime.

The fix involves having the refresh codepath (and other
operations that use the same codepath like insert etc.) to
reload the metadata of only modified files by doing a
listStatus() on the partition directory and checking the
last modified time of each file. Without this patch, we relied
on listFiles(), which fetched the block locations irrespective of
whether the file has changed and it was significantly slower on
unchanged tables. The initial/invalidate metadata load still
fetches the block locations in bulk using listFiles(). The
side effect of this change is that the refresh no longer picks up
block location changes after HDFS block rebalancing. We suggest
using "invalidate metadata" for that which loads the metadata from
scratch.

Additionally, this commit enables the reuse of metadata during
table refresh (which was disabled in IMPALA-4172) to prevent
reloading metadata from HMS everytime.

Change-Id: I859b9fe93563ba886d0b5db6db42a14c88caada8
Reviewed-on: http://gerrit.cloudera.org:8080/6009
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/26eaa266
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/26eaa266
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/26eaa266

Branch: refs/heads/master
Commit: 26eaa266092a5d8b37e21fd19dfbae81a952ac74
Parents: bd1d445
Author: Bharath Vissapragada <bh...@cloudera.com>
Authored: Thu Feb 9 22:54:40 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Feb 16 04:52:54 2017 +0000

----------------------------------------------------------------------
 .../impala/catalog/CatalogServiceCatalog.java   |   2 +-
 .../org/apache/impala/catalog/HdfsTable.java    | 140 +++++++++++++++----
 2 files changed, 110 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26eaa266/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
index 2c42874..8be0aa3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/CatalogServiceCatalog.java
@@ -974,7 +974,7 @@ public class CatalogServiceCatalog extends Catalog {
           throw new TableLoadingException("Error loading metadata for table: " +
               db.getName() + "." + tblName.getTable_name(), e);
         }
-        tbl.load(false, msClient.getHiveClient(), msTbl);
+        tbl.load(true, msClient.getHiveClient(), msTbl);
       }
       tbl.setCatalogVersion(newCatalogVersion);
       LOG.info(String.format("Refreshed table metadata: %s", tbl.getFullName()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/26eaa266/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 795dae2..6096ba9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -81,8 +81,10 @@ import org.apache.impala.util.TResultRowBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -311,28 +313,10 @@ public class HdfsTable extends Table {
         FileDescriptor fd = new FileDescriptor(fileName, fileStatus.getLen(),
             fileStatus.getModificationTime());
         BlockLocation[] locations = fileStatus.getBlockLocations();
-        String partPathDirName = partPathDir.toString();
-        for (BlockLocation loc: locations) {
-          Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
-          // Enumerate all replicas of the block, adding any unknown hosts
-          // to hostIndex_. We pick the network address from getNames() and
-          // map it to the corresponding hostname from getHosts().
-          List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
-              loc.getNames().length);
-          for (int i = 0; i < loc.getNames().length; ++i) {
-            TNetworkAddress networkAddress =
-                BlockReplica.parseLocation(loc.getNames()[i]);
-            replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
-                cachedHosts.contains(loc.getHosts()[i])));
-          }
-          FileBlock currentBlock =
-              new FileBlock(loc.getOffset(), loc.getLength(), replicas);
-          THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
-          fd.addThriftFileBlock(tHdfsFileBlock);
-          unknownDiskIdCount += loadDiskIds(loc, tHdfsFileBlock);
-        }
+        unknownDiskIdCount += setFdBlockMetadata(fd, locations);
         if (LOG.isTraceEnabled()) {
-          LOG.trace("Adding file md dir: " + partPathDirName + " file: " + fileName);
+          LOG.trace("Adding file md dir: " + partPathDir.toString() + " file: " +
+              fileName);
         }
         // Update the partitions' metadata that this file belongs to.
         for (HdfsPartition partition: partitions) {
@@ -354,6 +338,35 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Sets the block metadata for FileDescriptor 'fd' using block location metadata
+   * from 'locations'.
+   */
+  private int setFdBlockMetadata(FileDescriptor fd, BlockLocation[] locations)
+      throws IOException {
+    int unknownFdDiskIds = 0;
+    for (BlockLocation loc: locations) {
+      Set<String> cachedHosts = Sets.newHashSet(loc.getCachedHosts());
+      // Enumerate all replicas of the block, adding any unknown hosts
+      // to hostIndex_. We pick the network address from getNames() and
+      // map it to the corresponding hostname from getHosts().
+      List<BlockReplica> replicas = Lists.newArrayListWithExpectedSize(
+          loc.getNames().length);
+      for (int i = 0; i < loc.getNames().length; ++i) {
+        TNetworkAddress networkAddress =
+            BlockReplica.parseLocation(loc.getNames()[i]);
+        replicas.add(new BlockReplica(hostIndex_.getIndex(networkAddress),
+            cachedHosts.contains(loc.getHosts()[i])));
+      }
+      FileBlock currentBlock =
+          new FileBlock(loc.getOffset(), loc.getLength(), replicas);
+      THdfsFileBlock tHdfsFileBlock = currentBlock.toThrift();
+      fd.addThriftFileBlock(tHdfsFileBlock);
+      unknownFdDiskIds += loadDiskIds(loc, tHdfsFileBlock);
+    }
+    return unknownFdDiskIds;
+  }
+
+  /**
    * Loads the disk IDs for BlockLocation 'location' and its corresponding file block.
    * HDFS API for BlockLocation returns a storageID UUID string for each disk
    * hosting the block, which is then mapped to a 0-based integer id called disk ID.
@@ -388,6 +401,20 @@ public class HdfsTable extends Table {
   }
 
   /**
+   * Synthesize the block metadata for a given HdfsPartition object. Should only
+   * be called for FileSystems that do not support storage IDs.
+   */
+  private void synthesizeBlockMetadata(FileSystem fs, HdfsPartition partition)
+      throws IOException {
+    Preconditions.checkState(!FileSystemUtil.supportsStorageIds(fs));
+    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
+    Path partitionPath = partition.getLocationPath();
+    partition.setFileDescriptors(new ArrayList<FileDescriptor>());
+    partsByPath.put(partitionPath, Lists.newArrayList(partition));
+    synthesizeBlockMetadata(fs, partitionPath, partsByPath);
+  }
+
+  /**
    * For filesystems that don't support BlockLocation API, synthesize file blocks
    * by manually splitting the file range into fixed-size blocks.  That way, scan
    * ranges can be derived from file blocks as usual.  All synthesized blocks are given
@@ -755,11 +782,62 @@ public class HdfsTable extends Table {
     loadMetadataAndDiskIds(dirsToLoad, partsByPath);
   }
 
-  private void loadMetadataAndDiskIds(HdfsPartition partition) throws CatalogException {
-    Path partDirPath = partition.getLocationPath();
-    HashMap<Path, List<HdfsPartition>> partsByPath = Maps.newHashMap();
-    partsByPath.put(partDirPath, Lists.newArrayList(partition));
-    loadMetadataAndDiskIds(Lists.newArrayList(partDirPath), partsByPath);
+  /**
+   * Refreshes block metadata information for 'partition'. This method is optimized for
+   * the case where the files in the partition have not changed dramatically. It first
+   * uses a listStatus() call on the partition directory to detect files with changed
+   * mtime and fetches their block locations using the getFileBlockLocations() method.
+   * Our benchmarks suggest that the listStatus() call is much faster then the listFiles()
+   * (up to ~40x faster in some cases). The initial table load still uses the listFiles()
+   * on the data directory that fetches both the FileStatus as well as BlockLocations in
+   * a single call.
+   */
+  private void refreshFileMetadata(HdfsPartition partition) throws CatalogException {
+    Path partDir = partition.getLocationPath();
+    Preconditions.checkNotNull(partDir);
+    try {
+      FileSystem fs = partDir.getFileSystem(CONF);
+      if (!fs.exists(partDir)) {
+        partition.setFileDescriptors(new ArrayList<FileDescriptor>());
+        return;
+      }
+      if (!FileSystemUtil.supportsStorageIds(fs)) {
+        synthesizeBlockMetadata(fs, partition);
+        return;
+      }
+      // Index the partition file descriptors by their file names for O(1) look ups.
+      ImmutableMap<String, FileDescriptor> fileDescsByName = Maps.uniqueIndex(
+          partition.getFileDescriptors(), new Function<FileDescriptor, String>() {
+            public String apply(FileDescriptor desc) {
+              return desc.getFileName();
+            }
+          });
+      // Iterate through the current snapshot of the partition directory listing to
+      // figure out files that were newly added/modified.
+      List<FileDescriptor> newFileDescs = Lists.newArrayList();
+      int newPartSizeBytes = 0;
+      for (FileStatus fileStatus : fs.listStatus(partDir)) {
+        if (!FileSystemUtil.isValidDataFile(fileStatus)) continue;
+        String fileName = fileStatus.getPath().getName().toString();
+        FileDescriptor fd = fileDescsByName.get(fileName);
+        if (fd == null || partition.isMarkedCached() ||
+            fd.getFileLength() != fileStatus.getLen() ||
+            fd.getModificationTime() != fileStatus.getModificationTime()) {
+          fd = new FileDescriptor(fileName, fileStatus.getLen(),
+              fileStatus.getModificationTime());
+          setFdBlockMetadata(fd,
+              fs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()));
+        }
+        newFileDescs.add(fd);
+        newPartSizeBytes += fileStatus.getLen();
+      }
+      partition.setFileDescriptors(newFileDescs);
+      numHdfsFiles_ += newFileDescs.size();
+      totalHdfsBytes_ += newPartSizeBytes;
+    } catch(IOException e) {
+      throw new CatalogException("Error loading block metadata for partition " +
+          partition.toString(), e);
+    }
   }
 
   /**
@@ -772,7 +850,7 @@ public class HdfsTable extends Table {
     LOG.info(String.format(
         "Loading file and block metadata for %s partitions from %s paths: %s",
         partsByPath.size(), locations.size(), getFullName()));
-    for (Path location: locations) { loadBlockMetadata(location, partsByPath); }
+    for (Path location: locations) loadBlockMetadata(location, partsByPath);
     LOG.info(String.format(
         "Loaded file and block metadata for %s partitions from %s paths: %s",
         partsByPath.size(), locations.size(), getFullName()));
@@ -831,7 +909,7 @@ public class HdfsTable extends Table {
       org.apache.hadoop.hive.metastore.api.Partition msPartition)
       throws CatalogException {
     HdfsPartition hdfsPartition = createPartition(storageDescriptor, msPartition);
-    loadMetadataAndDiskIds(hdfsPartition);
+    refreshFileMetadata(hdfsPartition);
     return hdfsPartition;
   }
 
@@ -1119,8 +1197,8 @@ public class HdfsTable extends Table {
     addDefaultPartition(msTbl.getSd());
     HdfsPartition part = createPartition(msTbl.getSd(), null);
     addPartition(part);
-    loadMetadataAndDiskIds(part);
     if (isMarkedCached_) part.markCached();
+    refreshFileMetadata(part);
   }
 
   /**
@@ -1436,7 +1514,7 @@ public class HdfsTable extends Table {
         // WRITE_ONLY the table's access level should be NONE.
         accessLevel_ = TAccessLevel.READ_ONLY;
       }
-      loadMetadataAndDiskIds(partition);
+      refreshFileMetadata(partition);
     }
   }
 
@@ -1492,7 +1570,7 @@ public class HdfsTable extends Table {
     numHdfsFiles_ -= partition.getNumFileDescriptors();
     totalHdfsBytes_ -= partition.getSize();
     Preconditions.checkState(numHdfsFiles_ >= 0 && totalHdfsBytes_ >= 0);
-    loadMetadataAndDiskIds(partition);
+    refreshFileMetadata(partition);
   }
 
   @Override