You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2021/03/03 00:38:47 UTC

[impala] 05/06: IMPALA-10524: Changes to HdfsPartition for third party extensions.

This is an automated email from the ASF dual-hosted git repository.

tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 3554b0752d4be142dabb82b6b751323cde5323d8
Author: Steve Carlin <sc...@cloudera.com>
AuthorDate: Sat Feb 20 07:10:43 2021 -0800

    IMPALA-10524: Changes to HdfsPartition for third party extensions.
    
    Some changes are needed to HdfsPartition and other related classes
    to allow for third party extensions.  These changes include:
    
    - A protected constructor which will allow a subclass to instantiate
      HdfsPartition using its own Builder.
    - Various changes of permissions to methods and variables to allow
      third party extension visibility.
    - Creation of the getHostIndex() method to allow the subclass to
      override how the hostIndexes are retrieved.
    - Added a new default method "getFileSystem()" to FeFsPartition which
      will allow the third party extension to override how the filesystem
      is obtained from the partition object.
    
    Change-Id: I5a792642f27228118ac8f2e8ef98e8ba7aee4a46
    Reviewed-on: http://gerrit.cloudera.org:8080/17092
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../org/apache/impala/analysis/AnalyticWindow.java |  5 ++--
 .../org/apache/impala/analysis/StatementBase.java  |  5 ++--
 .../org/apache/impala/catalog/FeFsPartition.java   | 17 ++++++++++++
 .../org/apache/impala/catalog/HdfsPartition.java   | 27 +++++++++++++++++-
 .../catalog/HdfsPartitionLocationCompressor.java   |  2 +-
 .../impala/catalog/local/LocalFsPartition.java     |  5 ++++
 .../apache/impala/planner/AnalyticEvalNode.java    |  2 +-
 .../impala/planner/CardinalityCheckNode.java       |  2 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  2 +-
 .../org/apache/impala/planner/KuduScanNode.java    |  2 +-
 .../java/org/apache/impala/planner/PlanNode.java   |  2 +-
 .../java/org/apache/impala/planner/Planner.java    | 32 ++++++++++++++--------
 .../java/org/apache/impala/planner/SortNode.java   |  2 +-
 .../org/apache/impala/planner/SubplanNode.java     |  2 +-
 14 files changed, 82 insertions(+), 25 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
index 7b460f9..48b130c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
@@ -42,7 +42,7 @@ public class AnalyticWindow {
       new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
       new Boundary(BoundaryType.CURRENT_ROW, null));
 
-  enum Type {
+  public enum Type {
     ROWS("ROWS"),
     RANGE("RANGE");
 
@@ -131,8 +131,7 @@ public class AnalyticWindow {
       this(type, e, null);
     }
 
-    // c'tor used by clone()
-    private Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) {
+    public Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) {
       Preconditions.checkState(
         (type.isOffset() && e != null)
         || (!type.isOffset() && e == null));
diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
index e0fb2de..4fb2d43 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
@@ -200,8 +200,9 @@ public abstract class StatementBase extends StmtNode {
    * If strictDecimal is true, only consider casts that result in no loss of information
    * when casting between decimal types.
    */
-  protected Expr checkTypeCompatibility(String dstTableName, Column dstCol, Expr srcExpr,
-      boolean strictDecimal, Expr widestTypeSrcExpr) throws AnalysisException {
+  public static Expr checkTypeCompatibility(String dstTableName, Column dstCol,
+      Expr srcExpr, boolean strictDecimal, Expr widestTypeSrcExpr)
+      throws AnalysisException {
     Type dstColType = dstCol.getType();
     Type srcExprType = srcExpr.getType();
 
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index 4b66148..8ec49bb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -16,18 +16,23 @@
 // under the License.
 package org.apache.impala.catalog;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
 import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THdfsPartitionLocation;
+import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionStats;
+import org.apache.impala.util.ListMap;
 
 /**
  * Frontend interface for interacting with a single filesystem-based partition.
@@ -50,6 +55,11 @@ public interface FeFsPartition {
   FeFsTable getTable();
 
   /**
+   * @return ListMap<hostIndex> from partition's table.
+   */
+  ListMap<TNetworkAddress> getHostIndex();
+
+  /**
    * @return the FsType that this partition is stored on
    */
   FileSystemUtil.FsType getFsType();
@@ -95,6 +105,13 @@ public interface FeFsPartition {
   Path getLocationPath();
 
   /**
+   * @return the FileSystem of this partition
+   */
+  default FileSystem getFileSystem(Configuration conf) throws IOException {
+    return getLocationPath().getFileSystem(conf);
+  }
+
+  /**
    * @return the HDFS permissions Impala has to this partition's directory - READ_ONLY,
    * READ_WRITE, etc.
    */
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 3d3b0c3..28f1f3d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -682,7 +682,29 @@ public class HdfsPartition extends CatalogObjectImpl
   // it's not used in coordinators.
   private final InFlightEvents inFlightEvents_;
 
-  private HdfsPartition(HdfsTable table, long id, long prevId, String partName,
+  /**
+   * Constructor.  Needed for third party extensions that want to use their own builder
+   * to construct the object.
+   */
+  protected HdfsPartition(HdfsTable table, long prevId, String partName,
+      List<LiteralExpr> partitionKeyValues, HdfsStorageDescriptor fileFormatDescriptor,
+      @Nonnull ImmutableList<byte[]> encodedFileDescriptors,
+      ImmutableList<byte[]> encodedInsertFileDescriptors,
+      ImmutableList<byte[]> encodedDeleteFileDescriptors,
+      HdfsPartitionLocationCompressor.Location location,
+      boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters,
+      CachedHmsPartitionDescriptor cachedMsPartitionDescriptor,
+      byte[] partitionStats, boolean hasIncrementalStats, long numRows, long writeId,
+      InFlightEvents inFlightEvents) {
+    this(table, partitionIdCounter_.getAndIncrement(), prevId, partName,
+        partitionKeyValues, fileFormatDescriptor, encodedFileDescriptors,
+        encodedInsertFileDescriptors, encodedDeleteFileDescriptors, location,
+        isMarkedCached, accessLevel, hmsParameters, cachedMsPartitionDescriptor,
+        partitionStats, hasIncrementalStats, numRows, writeId, inFlightEvents);
+  }
+
+
+  protected HdfsPartition(HdfsTable table, long id, long prevId, String partName,
       List<LiteralExpr> partitionKeyValues, HdfsStorageDescriptor fileFormatDescriptor,
       @Nonnull ImmutableList<byte[]> encodedFileDescriptors,
       ImmutableList<byte[]> encodedInsertFileDescriptors,
@@ -786,6 +808,9 @@ public class HdfsPartition extends CatalogObjectImpl
   @Override // FeFsPartition
   public HdfsTable getTable() { return table_; }
 
+  @Override // FeFsPartition
+  public ListMap<TNetworkAddress> getHostIndex() { return table_.getHostIndex(); }
+
   @Override
   public FileSystemUtil.FsType getFsType() {
     Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
index 939bcf6..2ba6414 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
@@ -35,7 +35,7 @@ import com.google.common.base.Preconditions;
  * prefixes, like table locations.
  *
  */
-class HdfsPartitionLocationCompressor {
+public class HdfsPartitionLocationCompressor {
   int numClusteringColumns_;
 
   // A bi-directional map between partition location prefixes and their compressed
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 1bed832..575e01a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -39,7 +39,9 @@ import org.apache.impala.common.FileSystemUtil;
 import org.apache.impala.compat.MetastoreShim;
 import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.THdfsPartitionLocation;
+import org.apache.impala.thrift.TNetworkAddress;
 import org.apache.impala.thrift.TPartitionStats;
+import org.apache.impala.util.ListMap;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -104,6 +106,9 @@ public class LocalFsPartition implements FeFsPartition {
     return table_;
   }
 
+  @Override // FeFsPartition
+  public ListMap<TNetworkAddress> getHostIndex() { return table_.getHostIndex(); }
+
   @Override
   public FileSystemUtil.FsType getFsType() {
     Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 31463cf..252df3b 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -251,7 +251,7 @@ public class AnalyticEvalNode extends PlanNode {
   }
 
   @Override
-  protected void computeStats(Analyzer analyzer) {
+  public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     cardinality_ = getChild(0).cardinality_;
     cardinality_ = capCardinalityAtLimit(cardinality_);
diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
index f7629a1..00cd67f 100644
--- a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
 public class CardinalityCheckNode extends PlanNode {
   private final String displayStatement_;
 
-  protected CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) {
+  public CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) {
     super(id, "CARDINALITY CHECK");
     Preconditions.checkState(child.getLimit() <= 2);
     cardinality_ = 1;
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index bbff229..f22b835 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1194,7 +1194,7 @@ public class HdfsScanNode extends ScanNode {
         // Translate from the host index (local to the HdfsTable) to network address.
         int replicaHostIdx = FileBlock.getReplicaHostIdx(block, j);
         TNetworkAddress networkAddress =
-            partition.getTable().getHostIndex().getEntry(replicaHostIdx);
+            partition.getHostIndex().getEntry(replicaHostIdx);
         Preconditions.checkNotNull(networkAddress);
         // Translate from network address to the global (to this request) host index.
         Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index e4a2d0c..7e2d520 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -352,7 +352,7 @@ public class KuduScanNode extends ScanNode {
   }
 
   @Override
-  protected void computeStats(Analyzer analyzer) {
+  public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     computeNumNodes(analyzer);
 
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index a91cb4d..52afb59 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -590,7 +590,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
    * from init() (to facilitate inserting additional nodes during plan
    * partitioning w/o the need to call init() recursively on the whole tree again).
    */
-  protected void computeStats(Analyzer analyzer) {
+  public void computeStats(Analyzer analyzer) {
     avgRowSize_ = 0.0F;
     for (TupleId tid: tupleIds_) {
       TupleDescriptor desc = analyzer.getTupleDesc(tid);
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e769fb0..bd4acb5 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -642,31 +642,37 @@ public class Planner {
     return newJoinNode;
   }
 
-  private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
+  public static void checkForSmallQueryOptimization(PlanNode singleNodePlan,
+      PlannerContext ctx) {
     MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
     singleNodePlan.accept(visitor);
     if (!visitor.valid()) return;
     // This optimization executes the plan on a single node so the threshold must
     // be based on the total number of rows processed.
     long maxRowsProcessed = visitor.getMaxRowsProcessed();
-    int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold;
+    int threshold = ctx.getQueryOptions().exec_single_node_rows_threshold;
     if (maxRowsProcessed < threshold) {
       // Execute on a single node and disable codegen for small results
       LOG.trace("Query is small enough to execute on a single node: maxRowsProcessed = "
           + maxRowsProcessed);
-      ctx_.getQueryOptions().setNum_nodes(1);
-      ctx_.getQueryCtx().disable_codegen_hint = true;
-      if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
-          maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
+      ctx.getQueryOptions().setNum_nodes(1);
+      ctx.getQueryCtx().disable_codegen_hint = true;
+      if (maxRowsProcessed < ctx.getQueryOptions().batch_size ||
+          maxRowsProcessed < 1024 && ctx.getQueryOptions().batch_size == 0) {
         // Only one scanner thread for small queries
-        ctx_.getQueryOptions().setNum_scanner_threads(1);
+        ctx.getQueryOptions().setNum_scanner_threads(1);
       }
       // disable runtime filters
-      ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
+      ctx.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
     }
   }
 
-  private void checkForDisableCodegen(PlanNode distributedPlan) {
+  private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
+      checkForSmallQueryOptimization(singleNodePlan, ctx_);
+  }
+
+  public static void checkForDisableCodegen(PlanNode distributedPlan,
+      PlannerContext ctx) {
     MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
     distributedPlan.accept(visitor);
     if (!visitor.valid()) return;
@@ -674,11 +680,15 @@ public class Planner {
     // reduce per-node execution time enough to justify the cost of codegen. Per-node
     // execution time is correlated with the number of rows flowing through the plan.
     if (visitor.getMaxRowsProcessedPerNode()
-        < ctx_.getQueryOptions().getDisable_codegen_rows_threshold()) {
-      ctx_.getQueryCtx().disable_codegen_hint = true;
+        < ctx.getQueryOptions().getDisable_codegen_rows_threshold()) {
+      ctx.getQueryCtx().disable_codegen_hint = true;
     }
   }
 
+  private void checkForDisableCodegen(PlanNode distributedPlan) {
+      checkForDisableCodegen(distributedPlan, ctx_);
+  }
+
   /**
    * Insert a sort node on top of the plan, depending on the clustered/noclustered
    * plan hint and on the 'sort.columns' table property. If clustering is enabled in
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 79913ed..225c372 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -293,7 +293,7 @@ public class SortNode extends PlanNode {
   }
 
   @Override
-  protected void computeStats(Analyzer analyzer) {
+  public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     if (isTypeTopN() && includeTies_) {
       cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_, limitWithTies_);
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index f964c6d..eb57df6 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -85,7 +85,7 @@ public class SubplanNode extends PlanNode {
   }
 
   @Override
-  protected void computeStats(Analyzer analyzer) {
+  public void computeStats(Analyzer analyzer) {
     super.computeStats(analyzer);
     if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) {
       cardinality_ =