You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ar...@apache.org on 2018/07/04 14:20:01 UTC

[drill] 01/04: DRILL-6557: Use size in bytes during Hive statistics calculation if present

This is an automated email from the ASF dual-hosted git repository.

arina pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit e047a2bf79df5b2dc4d3abc00c79c6a53903522f
Author: Arina Ielchiieva <ar...@gmail.com>
AuthorDate: Fri Jun 29 18:25:59 2018 +0300

    DRILL-6557: Use size in bytes during Hive statistics calculation if present
    
    1. Check size in bytes presence in stats before fetching input splits and use it if present.
    2. Add log trace suggesting to use ANALYZE command before running queries if statistics is unavailable and Drill had to fetch all input splits.
    3. Minor refactoring /  cleanup in HiveMetadataProvider class.
    
    closes #1357
---
 .../exec/store/hive/HiveMetadataProvider.java      | 232 +++++++++++----------
 1 file changed, 123 insertions(+), 109 deletions(-)

diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 6da6c40..de45dc6 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -19,8 +19,6 @@ package org.apache.drill.exec.store.hive;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Ordering;
 import com.google.common.collect.TreeMultimap;
@@ -50,12 +48,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * Class which provides methods to get metadata of given Hive table selection. It tries to use the stats stored in
@@ -79,50 +79,52 @@ public class HiveMetadataProvider {
   public HiveMetadataProvider(final String userName, final HiveReadEntry hiveReadEntry, final HiveConf hiveConf) {
     this.hiveReadEntry = hiveReadEntry;
     this.ugi = ImpersonationUtil.createProxyUgi(userName);
-    isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
-    partitionInputSplitMap = Maps.newHashMap();
+    this.isPartitionedTable = hiveReadEntry.getTable().getPartitionKeysSize() > 0;
+    this.partitionInputSplitMap = new HashMap<>();
     this.hiveConf = hiveConf;
   }
 
   /**
-   * Return stats for table/partitions in given {@link HiveReadEntry}. If valid stats are available in MetaStore,
-   * return it. Otherwise estimate using the size of the input data.
+   * Return stats for table/partitions in given {@link HiveReadEntry}.
+   * If valid stats are available in MetaStore, return it.
+   * Otherwise estimate using the size of the input data.
    *
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this cache object.
-   * @return
-   * @throws IOException
+   * @return hive statistics holder
+   * @throws IOException if was unable to retrieve table statistics
    */
   public HiveStats getStats(final HiveReadEntry hiveReadEntry) throws IOException {
-    final Stopwatch timeGetStats = Stopwatch.createStarted();
+    Stopwatch timeGetStats = Stopwatch.createStarted();
 
-    final HiveTableWithColumnCache table = hiveReadEntry.getTable();
+    HiveTableWithColumnCache table = hiveReadEntry.getTable();
     try {
       if (!isPartitionedTable) {
-        final Properties properties = MetaStoreUtils.getTableMetadata(table);
-        final HiveStats stats = getStatsFromProps(properties);
+        Properties properties = MetaStoreUtils.getTableMetadata(table);
+        HiveStats stats = HiveStats.getStatsFromProps(properties);
         if (stats.valid()) {
           return stats;
         }
 
-        // estimate the stats from the InputSplits.
-        return getStatsEstimateFromInputSplits(getTableInputSplits());
+        return stats.getSizeInBytes() > 0 ? estimateStatsFromBytes(stats.getSizeInBytes()) :
+            estimateStatsFromInputSplits(getTableInputSplits());
+
       } else {
-        final HiveStats aggStats = new HiveStats(0, 0);
-        for(HivePartition partition : hiveReadEntry.getPartitions()) {
-          final Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
-          HiveStats stats = getStatsFromProps(properties);
+        HiveStats aggStats = new HiveStats(0, 0);
+        for (HivePartition partition : hiveReadEntry.getPartitions()) {
+          Properties properties = HiveUtilities.getPartitionMetadata(partition, table);
+          HiveStats stats = HiveStats.getStatsFromProps(properties);
 
           if (!stats.valid()) {
-            // estimate the stats from InputSplits
-            stats = getStatsEstimateFromInputSplits(getPartitionInputSplits(partition));
+            stats = stats.getSizeInBytes() > 0 ? estimateStatsFromBytes(stats.getSizeInBytes()) :
+                estimateStatsFromInputSplits(getPartitionInputSplits(partition));
           }
           aggStats.add(stats);
         }
 
         return aggStats;
       }
-    } catch (final Exception e) {
-      throw new IOException("Failed to get numRows from HiveTable", e);
+    } catch (Exception e) {
+      throw new IOException("Failed to get number of rows and total size from HiveTable", e);
     } finally {
       logger.debug("Took {} µs to get stats from {}.{}", timeGetStats.elapsed(TimeUnit.NANOSECONDS) / 1000,
           table.getDbName(), table.getTableName());
@@ -130,7 +132,7 @@ public class HiveMetadataProvider {
   }
 
   /** Helper method which return InputSplits for non-partitioned table */
-  private List<LogicalInputSplit> getTableInputSplits() throws Exception {
+  private List<LogicalInputSplit> getTableInputSplits() {
     Preconditions.checkState(!isPartitionedTable, "Works only for non-partitioned tables");
     if (tableInputSplits != null) {
       return tableInputSplits;
@@ -145,7 +147,7 @@ public class HiveMetadataProvider {
   /** Helper method which returns the InputSplits for given partition. InputSplits are cached to speed up subsequent
    * metadata cache requests for the same partition(s).
    */
-  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition partition) throws Exception {
+  private List<LogicalInputSplit> getPartitionInputSplits(final HivePartition partition) {
     if (partitionInputSplitMap.containsKey(partition)) {
       return partitionInputSplitMap.get(partition);
     }
@@ -164,18 +166,17 @@ public class HiveMetadataProvider {
    * @param hiveReadEntry Subset of the {@link HiveReadEntry} used when creating this object.
    * @return list of logically grouped input splits
    */
-  public List<LogicalInputSplit> getInputSplits(final HiveReadEntry hiveReadEntry) {
-    final Stopwatch timeGetSplits = Stopwatch.createStarted();
+  public List<LogicalInputSplit> getInputSplits(HiveReadEntry hiveReadEntry) {
+    Stopwatch timeGetSplits = Stopwatch.createStarted();
     try {
       if (!isPartitionedTable) {
         return getTableInputSplits();
       }
 
-      final List<LogicalInputSplit> splits = Lists.newArrayList();
-      for (HivePartition p : hiveReadEntry.getPartitions()) {
-        splits.addAll(getPartitionInputSplits(p));
-      }
-      return splits;
+      return hiveReadEntry.getPartitions().stream()
+          .flatMap(p -> getPartitionInputSplits(p).stream())
+          .collect(Collectors.toList());
+
     } catch (final Exception e) {
       logger.error("Failed to get InputSplits", e);
       throw new DrillRuntimeException("Failed to get InputSplits", e);
@@ -190,63 +191,44 @@ public class HiveMetadataProvider {
    *
    * @param hiveReadEntry {@link HiveReadEntry} containing the input table and/or partitions.
    */
-  protected List<String> getInputDirectories(final HiveReadEntry hiveReadEntry) {
+  protected List<String> getInputDirectories(HiveReadEntry hiveReadEntry) {
     if (isPartitionedTable) {
-      final List<String> inputs = Lists.newArrayList();
-      for(Partition p : hiveReadEntry.getPartitions()) {
-        inputs.add(p.getSd().getLocation());
-      }
-      return inputs;
+      return hiveReadEntry.getPartitions().stream()
+          .map(p -> p.getSd().getLocation())
+          .collect(Collectors.toList());
     }
 
     return Collections.singletonList(hiveReadEntry.getTable().getSd().getLocation());
   }
 
   /**
-   * Get the stats from table properties. If not found -1 is returned for each stats field.
-   * CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
-   * Hive table to have up-to-date stats.
+   * Estimate the stats from the given list of logically grouped input splits.
    *
-   * @param properties the source of table stats
-   * @return {@link HiveStats} instance with rows number and size in bytes from specified properties
+   * @param inputSplits list of logically grouped input splits
+   * @return hive stats with numRows and totalSizeInBytes
    */
-  private HiveStats getStatsFromProps(final Properties properties) {
-    long numRows = -1;
-    long sizeInBytes = -1;
-    try {
-      final String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
-      if (numRowsProp != null) {
-          numRows = Long.valueOf(numRowsProp);
-      }
-
-      final String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
-      if (sizeInBytesProp != null) {
-        sizeInBytes = Long.valueOf(sizeInBytesProp);
-      }
-    } catch (final NumberFormatException e) {
-      logger.error("Failed to parse Hive stats in metastore.", e);
-      // continue with the defaults.
+  private HiveStats estimateStatsFromInputSplits(List<LogicalInputSplit> inputSplits) throws IOException {
+    logger.trace("Collecting stats based on input splits size. " +
+        "It means that we might have fetched all input splits before applying any possible optimizations (ex: partition pruning). " +
+        "Consider using ANALYZE command on Hive table to collect statistics before running queries.");
+    long sizeInBytes = 0;
+    for (LogicalInputSplit split : inputSplits) {
+      sizeInBytes += split.getLength();
     }
-
-    return new HiveStats(numRows, sizeInBytes);
+    return estimateStatsFromBytes(sizeInBytes);
   }
 
   /**
-   * Estimate the stats from the given list of logically grouped input splits.
+   * Estimates Hive stats based on give size in bytes.
    *
-   * @param inputSplits list of logically grouped input splits
-   * @return hive stats usually numRows and totalSizeInBytes which used
+   * @param sizeInBytes size in bytes
+   * @return hive stats with numRows and totalSizeInBytes
    */
-  private HiveStats getStatsEstimateFromInputSplits(final List<LogicalInputSplit> inputSplits) throws IOException {
-    long data = 0;
-    for (final LogicalInputSplit split : inputSplits) {
-      data += split.getLength();
-    }
-
-    long numRows = data / RECORD_SIZE;
+  private HiveStats estimateStatsFromBytes(long sizeInBytes) {
+    long numRows = sizeInBytes / RECORD_SIZE;
     // if the result of division is zero and data size > 0, estimate to one row
-    numRows = numRows == 0 && data > 0 ? 1 : numRows;
-    return new HiveStats(numRows, data);
+    numRows = numRows == 0 && sizeInBytes > 0 ? 1 : numRows;
+    return new HiveStats(numRows, sizeInBytes);
   }
 
   /**
@@ -262,36 +244,34 @@ public class HiveMetadataProvider {
   private List<LogicalInputSplit> splitInputWithUGI(final Properties properties, final StorageDescriptor sd, final Partition partition) {
     watch.start();
     try {
-      return ugi.doAs(new PrivilegedExceptionAction<List<LogicalInputSplit>>() {
-        public List<LogicalInputSplit> run() throws Exception {
-          final List<LogicalInputSplit> splits = Lists.newArrayList();
-          final JobConf job = new JobConf(hiveConf);
-          HiveUtilities.addConfToJob(job, properties);
-          HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
-          job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
-          final Path path = new Path(sd.getLocation());
-          final FileSystem fs = path.getFileSystem(job);
-          if (fs.exists(path)) {
-            FileInputFormat.addInputPath(job, path);
-            final InputFormat<?, ?> format = job.getInputFormat();
-            InputSplit[] inputSplits = format.getSplits(job, 1);
-
-            // if current table with text input format and has header / footer,
-            // we need to make sure that splits of the same file are grouped together
-            if (TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
-                HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
-              Multimap<Path, FileSplit> inputSplitMultimap = transformFileSplits(inputSplits);
-              for (Collection<FileSplit> logicalInputSplit : inputSplitMultimap.asMap().values()) {
-                splits.add(new LogicalInputSplit(logicalInputSplit, partition));
-              }
-            } else {
-              for (final InputSplit split : inputSplits) {
-                splits.add(new LogicalInputSplit(split, partition));
-              }
+      return ugi.doAs((PrivilegedExceptionAction<List<LogicalInputSplit>>) () -> {
+        final List<LogicalInputSplit> splits = new ArrayList<>();
+        final JobConf job = new JobConf(hiveConf);
+        HiveUtilities.addConfToJob(job, properties);
+        HiveUtilities.verifyAndAddTransactionalProperties(job, sd);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
+        final Path path = new Path(sd.getLocation());
+        final FileSystem fs = path.getFileSystem(job);
+        if (fs.exists(path)) {
+          FileInputFormat.addInputPath(job, path);
+          final InputFormat<?, ?> format = job.getInputFormat();
+          InputSplit[] inputSplits = format.getSplits(job, 1);
+
+          // if current table with text input format and has header / footer,
+          // we need to make sure that splits of the same file are grouped together
+          if (TextInputFormat.class.getCanonicalName().equals(sd.getInputFormat()) &&
+              HiveUtilities.hasHeaderOrFooter(hiveReadEntry.getTable())) {
+            Multimap<Path, FileSplit> inputSplitMultimap = transformFileSplits(inputSplits);
+            for (Collection<FileSplit> logicalInputSplit : inputSplitMultimap.asMap().values()) {
+              splits.add(new LogicalInputSplit(logicalInputSplit, partition));
+            }
+          } else {
+            for (final InputSplit split : inputSplits) {
+              splits.add(new LogicalInputSplit(split, partition));
             }
           }
-          return splits;
         }
+        return splits;
       });
     } catch (final InterruptedException | IOException e) {
       final String errMsg = String.format("Failed to create input splits: %s", e.getMessage());
@@ -320,13 +300,8 @@ public class HiveMetadataProvider {
    * @return multimap where key is file path and value is group of ordered file splits
    */
   private Multimap<Path, FileSplit> transformFileSplits(InputSplit[] inputSplits) {
-    Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(Ordering.<Path>natural(),
-        new Comparator<FileSplit>() {
-      @Override
-      public int compare(FileSplit f1, FileSplit f2) {
-        return Long.compare(f1.getStart(), f2.getStart());
-      }
-    });
+    Multimap<Path, FileSplit> inputSplitGroups = TreeMultimap.create(
+        Ordering.natural(), Comparator.comparingLong(FileSplit::getStart));
 
     for (InputSplit inputSplit : inputSplits) {
       FileSplit fileSplit = (FileSplit) inputSplit;
@@ -413,16 +388,53 @@ public class HiveMetadataProvider {
     }
   }
 
-  /** Contains stats. Currently only numRows and totalSizeInBytes are used. */
+  /**
+   * Contains stats. Currently only numRows and totalSizeInBytes are used.
+   */
   public static class HiveStats {
+
+    private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStats.class);
+
     private long numRows;
     private long sizeInBytes;
 
-    public HiveStats(final long numRows, final long sizeInBytes) {
+    public HiveStats(long numRows, long sizeInBytes) {
       this.numRows = numRows;
       this.sizeInBytes = sizeInBytes;
     }
 
+    /**
+     * Get the stats from table properties. If not found -1 is returned for each stats field.
+     * CAUTION: stats may not be up-to-date with the underlying data. It is always good to run the ANALYZE command on
+     * Hive table to have up-to-date stats.
+     *
+     * @param properties the source of table stats
+     * @return {@link HiveStats} instance with rows number and size in bytes from specified properties
+     */
+    public static HiveStats getStatsFromProps(Properties properties) {
+      long numRows = -1;
+      long sizeInBytes = -1;
+      try {
+        String sizeInBytesProp = properties.getProperty(StatsSetupConst.TOTAL_SIZE);
+        if (sizeInBytesProp != null) {
+          sizeInBytes = Long.valueOf(sizeInBytesProp);
+        }
+
+        String numRowsProp = properties.getProperty(StatsSetupConst.ROW_COUNT);
+        if (numRowsProp != null) {
+          numRows = Long.valueOf(numRowsProp);
+        }
+      } catch (NumberFormatException e) {
+        logger.error("Failed to parse Hive stats from metastore.", e);
+        // continue with the defaults.
+      }
+
+      HiveStats hiveStats = new HiveStats(numRows, sizeInBytes);
+      logger.trace("Obtained Hive stats from properties: {}.", hiveStats);
+      return hiveStats;
+    }
+
+
     public long getNumRows() {
       return numRows;
     }
@@ -431,7 +443,9 @@ public class HiveMetadataProvider {
       return sizeInBytes;
     }
 
-    /** Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid */
+    /**
+     * Both numRows and sizeInBytes are expected to be greater than 0 for stats to be valid
+     */
     public boolean valid() {
       return numRows > 0 && sizeInBytes > 0;
     }