You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/08/01 17:24:28 UTC

[3/6] impala git commit: IMPALA-7234: Improve memory estimates produced by the Planner

IMPALA-7234: Improve memory estimates produced by the Planner

Previously, the planner used the getMajorityFormat to estimate
the memory requirements of its partitions. Additionally, before
IMPALA-6625 was merged, the majority format for a multi-format
table with no numerical majority was calculated using a HashMap,
thus producing non deterministic results. This change ensures that
the memory estimate is deterministic and always based on partition
that has the maximum memory requirement.

Testing: Ran all PlannerTests. Also, modified plans of scans with
multiple partitions to ensure that the memory estimate produced
corresponds to the partition with the maximum requirement.

Change-Id: I0666ae3d45fbd8615d3fa9a8626ebd29cf94fb4b
Reviewed-on: http://gerrit.cloudera.org:8080/11001
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/672a271f
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/672a271f
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/672a271f

Branch: refs/heads/master
Commit: 672a271fd0966bd77f38eda9b6f1e768415bac04
Parents: de4bdb0
Author: poojanilangekar <po...@cloudera.com>
Authored: Thu Jul 19 13:24:41 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Tue Jul 31 21:01:57 2018 +0000

----------------------------------------------------------------------
 .../apache/impala/catalog/FeCatalogUtils.java   | 31 +++--------
 .../org/apache/impala/catalog/FeFsTable.java    |  9 ++--
 .../org/apache/impala/catalog/HdfsTable.java    |  6 +--
 .../impala/catalog/local/LocalFsTable.java      | 12 ++---
 .../org/apache/impala/planner/HdfsScanNode.java | 30 ++++++-----
 .../apache/impala/planner/HdfsTableSink.java    | 56 +++++++++++---------
 .../PlannerTest/parquet-filtering-disabled.test |  2 +-
 .../queries/PlannerTest/parquet-filtering.test  |  2 +-
 8 files changed, 70 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
index 4f1d68d..2072228 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeCatalogUtils.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -45,6 +46,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * Static utility functions shared between FeCatalog implementations.
@@ -294,32 +296,15 @@ public abstract class FeCatalogUtils {
   }
 
   /**
-   * Return the most commonly-used file format within a set of partitions.
+   * Return the set of all file formats used in the collection of partitions.
    */
-  public static HdfsFileFormat getMajorityFormat(
+  public static Set<HdfsFileFormat> getFileFormats(
       Iterable<? extends FeFsPartition> partitions) {
-    Map<HdfsFileFormat, Integer> numPartitionsByFormat = Maps.newTreeMap();
-    for (FeFsPartition partition: partitions) {
-      HdfsFileFormat format = partition.getInputFormatDescriptor().getFileFormat();
-      Integer numPartitions = numPartitionsByFormat.get(format);
-      if (numPartitions == null) {
-        numPartitions = Integer.valueOf(1);
-      } else {
-        numPartitions = Integer.valueOf(numPartitions.intValue() + 1);
-      }
-      numPartitionsByFormat.put(format, numPartitions);
-    }
-
-    int maxNumPartitions = Integer.MIN_VALUE;
-    HdfsFileFormat majorityFormat = null;
-    for (Map.Entry<HdfsFileFormat, Integer> entry: numPartitionsByFormat.entrySet()) {
-      if (entry.getValue().intValue() > maxNumPartitions) {
-        majorityFormat = entry.getKey();
-        maxNumPartitions = entry.getValue().intValue();
-      }
+    Set<HdfsFileFormat> fileFormats = Sets.newHashSet();
+    for (FeFsPartition partition : partitions) {
+      fileFormats.add(partition.getFileFormat());
     }
-    Preconditions.checkNotNull(majorityFormat);
-    return majorityFormat;
+    return fileFormats;
   }
 
   public static THdfsPartition fsPartitionToThrift(FeFsPartition part,

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
index 86b41f0..891bf62 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsTable.java
@@ -95,12 +95,11 @@ public interface FeFsTable extends FeTable {
   boolean isAvroTable();
 
   /**
-   * @return the format that the majority of partitions in this table use
-   *
-   * TODO(todd): this API needs to be removed, since it depends on loading all
-   * partitions even when scanning few.
+   * @return the set of file formats that the partitions in this table use.
+   * This API is only used by the TableSink to write out partitions. It
+   * should not be used for scanning.
    */
-  public HdfsFileFormat getMajorityFormat();
+  public Set<HdfsFileFormat> getFileFormats();
 
   /**
    * Return true if the table may be written to.

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 0f30407..d66ddd2 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -1808,14 +1808,14 @@ public class HdfsTable extends Table implements FeFsTable {
   public ListMap<TNetworkAddress> getHostIndex() { return hostIndex_; }
 
   /**
-   * Returns the file format that the majority of partitions are stored in.
+   * Returns the set of file formats that the partitions are stored in.
    */
-  public HdfsFileFormat getMajorityFormat() {
+  public Set<HdfsFileFormat> getFileFormats() {
     // In the case that we have no partitions added to the table yet, it's
     // important to add the "prototype" partition as a fallback.
     Iterable<HdfsPartition> partitionsToConsider = Iterables.concat(
         partitionMap_.values(), Collections.singleton(prototypePartition_));
-    return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
+    return FeCatalogUtils.getFileFormats(partitionsToConsider);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 82af240..7753faa 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -178,15 +178,9 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
   }
 
   @Override
-  public HdfsFileFormat getMajorityFormat() {
-    // TODO(todd): can we avoid loading all partitions here? this is called
-    // for any INSERT query, even if the partition is specified.
-    Collection<? extends FeFsPartition> parts = FeCatalogUtils.loadAllPartitions(this);
-    // In the case that we have no partitions added to the table yet, it's
-    // important to add the "prototype" partition as a fallback.
-    Iterable<FeFsPartition> partitionsToConsider = Iterables.concat(
-        parts, Collections.singleton(createPrototypePartition()));
-    return FeCatalogUtils.getMajorityFormat(partitionsToConsider);
+  public Set<HdfsFileFormat> getFileFormats() {
+    // Needed by HdfsTableSink.
+    throw new UnsupportedOperationException("TODO: implement me");
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 55bf301..151cbe0 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1366,18 +1366,24 @@ public class HdfsScanNode extends ScanNode {
       columnReservations = computeMinColumnMemReservations();
     }
 
-    int perHostScanRanges;
-    HdfsFileFormat majorityFormat = FeCatalogUtils.getMajorityFormat(partitions_);
-    if (majorityFormat == HdfsFileFormat.PARQUET
-        || majorityFormat == HdfsFileFormat.ORC) {
-      Preconditions.checkNotNull(columnReservations);
-      // For the purpose of this estimation, the number of per-host scan ranges for
-      // Parquet/ORC files are equal to the number of columns read from the file. I.e.
-      // excluding partition columns and columns that are populated from file metadata.
-      perHostScanRanges = columnReservations.size();
-    } else {
-      perHostScanRanges = (int) Math.ceil(
-          ((double) scanRangeSize / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
+    int perHostScanRanges = 0;
+    for (HdfsFileFormat format : fileFormats_) {
+      int partitionScanRange = 0;
+      if ((format == HdfsFileFormat.PARQUET) || (format == HdfsFileFormat.ORC)) {
+        Preconditions.checkNotNull(columnReservations);
+        // For the purpose of this estimation, the number of per-host scan ranges for
+        // Parquet/ORC files are equal to the number of columns read from the file. I.e.
+        // excluding partition columns and columns that are populated from file metadata.
+        partitionScanRange = columnReservations.size();
+      } else {
+        partitionScanRange = (int) Math.ceil(
+            ((double) scanRangeSize / (double) numNodes_) * SCAN_RANGE_SKEW_FACTOR);
+      }
+      // From the resource management purview, we want to conservatively estimate memory
+      // consumption based on the partition with the highest memory requirements.
+      if (partitionScanRange > perHostScanRanges) {
+        perHostScanRanges = partitionScanRange;
+      }
     }
 
     // The non-MT scan node requires at least one scanner thread.

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
index 7426641..48e7c62 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java
@@ -18,12 +18,14 @@
 package org.apache.impala.planner;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.impala.analysis.DescriptorTable;
 import org.apache.impala.analysis.Expr;
 import org.apache.impala.catalog.FeFsTable;
 import org.apache.impala.catalog.FeTable;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.HdfsTable;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
@@ -31,8 +33,12 @@ import org.apache.impala.thrift.THdfsTableSink;
 import org.apache.impala.thrift.TQueryOptions;
 import org.apache.impala.thrift.TTableSink;
 import org.apache.impala.thrift.TTableSinkType;
+
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * Sink for inserting into filesystem-backed tables.
@@ -53,6 +59,10 @@ public class HdfsTableSink extends TableSink {
   // be opened, written, and closed one by one.
   protected final boolean inputIsClustered_;
 
+  private static final Set<HdfsFileFormat> SUPPORTED_FILE_FORMATS = ImmutableSet.of(
+      HdfsFileFormat.PARQUET, HdfsFileFormat.TEXT, HdfsFileFormat.LZO_TEXT,
+      HdfsFileFormat.RC_FILE, HdfsFileFormat.SEQUENCE_FILE, HdfsFileFormat.AVRO);
+
   // Stores the indices into the list of non-clustering columns of the target table that
   // are stored in the 'sort.columns' table property. This is sent to the backend to
   // populate the RowGroup::sorting_columns list in parquet files.
@@ -70,9 +80,6 @@ public class HdfsTableSink extends TableSink {
 
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    FeFsTable table = (FeFsTable) targetTable_;
-    // TODO: Estimate the memory requirements more accurately by partition type.
-    HdfsFileFormat format = table.getMajorityFormat();
     PlanNode inputNode = fragment_.getPlanRoot();
     int numInstances = fragment_.getNumInstances(queryOptions.getMt_dop());
     // Compute the per-instance number of partitions, taking the number of nodes
@@ -82,7 +89,11 @@ public class HdfsTableSink extends TableSink {
     if (numPartitionsPerInstance == -1) {
       numPartitionsPerInstance = DEFAULT_NUM_PARTITIONS;
     }
-    long perPartitionMemReq = getPerPartitionMemReq(format);
+
+    HdfsTable table = (HdfsTable) targetTable_;
+    // TODO: Estimate the memory requirements more accurately by partition type.
+    Set<HdfsFileFormat> formats = table.getFileFormats();
+    long perPartitionMemReq = getPerPartitionMemReq(formats);
 
     long perInstanceMemEstimate;
     // The estimate is based purely on the per-partition mem req if the input cardinality_
@@ -105,28 +116,25 @@ public class HdfsTableSink extends TableSink {
 
   /**
    * Returns the per-partition memory requirement for inserting into the given
-   * file format.
+   * set of file formats.
    */
-  private long getPerPartitionMemReq(HdfsFileFormat format) {
-    switch (format) {
-      case PARQUET:
-        // Writing to a Parquet table requires up to 1GB of buffer per partition.
-        // TODO: The per-partition memory requirement is configurable in the QueryOptions.
-        return 1024L * 1024L * 1024L;
-      case TEXT:
-      case LZO_TEXT:
-        // Very approximate estimate of amount of data buffered.
-        return 100L * 1024L;
-      case RC_FILE:
-      case SEQUENCE_FILE:
-      case AVRO:
-        // Very approximate estimate of amount of data buffered.
-        return 100L * 1024L;
-      default:
-        Preconditions.checkState(false, "Unsupported TableSink format " +
-            format.toString());
+  private long getPerPartitionMemReq(Set<HdfsFileFormat> formats) {
+    Set<HdfsFileFormat> unsupportedFormats =
+        Sets.difference(formats, SUPPORTED_FILE_FORMATS);
+    if (!unsupportedFormats.isEmpty()) {
+      Preconditions.checkState(false,
+          "Unsupported TableSink format(s): " + Joiner.on(',').join(unsupportedFormats));
     }
-    return 0;
+    if (formats.contains(HdfsFileFormat.PARQUET)) {
+      // Writing to a Parquet partition requires up to 1GB of buffer. From a resource
+      // management purview, even if there are non-Parquet partitions, we want to be
+      // conservative and make a high memory estimate.
+      return 1024L * 1024L * 1024L;
+    }
+
+    // For all other supported formats (TEXT, LZO_TEXT, RC_FILE, SEQUENCE_FILE & AVRO)
+    // 100KB is a very approximate estimate of the amount of data buffered.
+    return 100L * 1024L;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
index 4cccd06..afdb8f2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -306,6 +306,6 @@ PLAN-ROOT SINK
      partitions: 0/4 rows=unavailable
      columns missing stats: id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col
    extrapolated-rows=disabled max-scan-range-rows=unavailable
-   mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+   mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=unavailable
 ====

http://git-wip-us.apache.org/repos/asf/impala/blob/672a271f/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 8df8716..64bf5f2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -528,7 +528,7 @@ PLAN-ROOT SINK
    extrapolated-rows=disabled max-scan-range-rows=unavailable
    parquet statistics predicates: bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), date_string_col > '1993-10-01'
    parquet dictionary predicates: bool_col, bigint_col < 5000, double_col > 100.00, float_col > 50.00, id = 1, tinyint_col < 50, string_col IN ('aaaa', 'bbbb', 'cccc'), smallint_col IN (1, 2, 3, 4, 5), mod(int_col, 2) = 1, timestamp_cmp(timestamp_col, TIMESTAMP '2016-11-20 00:00:00') = 1, date_string_col > '1993-10-01'
-   mem-estimate=32.00MB mem-reservation=88.00KB thread-reservation=1
+   mem-estimate=128.00MB mem-reservation=88.00KB thread-reservation=1
    tuple-ids=0 row-size=80B cardinality=unavailable
 ====
 # Test a variety of predicates on a mixed format table.