You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2021/03/03 00:38:47 UTC
[impala] 05/06: IMPALA-10524: Changes to HdfsPartition for third
party extensions.
This is an automated email from the ASF dual-hosted git repository.
tmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git
commit 3554b0752d4be142dabb82b6b751323cde5323d8
Author: Steve Carlin <sc...@cloudera.com>
AuthorDate: Sat Feb 20 07:10:43 2021 -0800
IMPALA-10524: Changes to HdfsPartition for third party extensions.
Some changes are needed to HdfsPartition and other related classes
to allow for third party extensions. These changes include:
- A protected constructor which will allow a subclass to instantiate
HdfsPartition using its own Builder.
- Various changes of permissions to methods and variables to allow
third party extension visibility.
- Creation of the getHostIndex() method to allow the subclass to
override how the hostIndexes are retrieved.
- Added a new default method "getFileSystem()" to FeFsPartition which
will allow the third party extension to override how the filesystem
is obtained from the partition object.
Change-Id: I5a792642f27228118ac8f2e8ef98e8ba7aee4a46
Reviewed-on: http://gerrit.cloudera.org:8080/17092
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
.../org/apache/impala/analysis/AnalyticWindow.java | 5 ++--
.../org/apache/impala/analysis/StatementBase.java | 5 ++--
.../org/apache/impala/catalog/FeFsPartition.java | 17 ++++++++++++
.../org/apache/impala/catalog/HdfsPartition.java | 27 +++++++++++++++++-
.../catalog/HdfsPartitionLocationCompressor.java | 2 +-
.../impala/catalog/local/LocalFsPartition.java | 5 ++++
.../apache/impala/planner/AnalyticEvalNode.java | 2 +-
.../impala/planner/CardinalityCheckNode.java | 2 +-
.../org/apache/impala/planner/HdfsScanNode.java | 2 +-
.../org/apache/impala/planner/KuduScanNode.java | 2 +-
.../java/org/apache/impala/planner/PlanNode.java | 2 +-
.../java/org/apache/impala/planner/Planner.java | 32 ++++++++++++++--------
.../java/org/apache/impala/planner/SortNode.java | 2 +-
.../org/apache/impala/planner/SubplanNode.java | 2 +-
14 files changed, 82 insertions(+), 25 deletions(-)
diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
index 7b460f9..48b130c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalyticWindow.java
@@ -42,7 +42,7 @@ public class AnalyticWindow {
new Boundary(BoundaryType.UNBOUNDED_PRECEDING, null),
new Boundary(BoundaryType.CURRENT_ROW, null));
- enum Type {
+ public enum Type {
ROWS("ROWS"),
RANGE("RANGE");
@@ -131,8 +131,7 @@ public class AnalyticWindow {
this(type, e, null);
}
- // c'tor used by clone()
- private Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) {
+ public Boundary(BoundaryType type, Expr e, BigDecimal offsetValue) {
Preconditions.checkState(
(type.isOffset() && e != null)
|| (!type.isOffset() && e == null));
diff --git a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
index e0fb2de..4fb2d43 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StatementBase.java
@@ -200,8 +200,9 @@ public abstract class StatementBase extends StmtNode {
* If strictDecimal is true, only consider casts that result in no loss of information
* when casting between decimal types.
*/
- protected Expr checkTypeCompatibility(String dstTableName, Column dstCol, Expr srcExpr,
- boolean strictDecimal, Expr widestTypeSrcExpr) throws AnalysisException {
+ public static Expr checkTypeCompatibility(String dstTableName, Column dstCol,
+ Expr srcExpr, boolean strictDecimal, Expr widestTypeSrcExpr)
+ throws AnalysisException {
Type dstColType = dstCol.getType();
Type srcExprType = srcExpr.getType();
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
index 4b66148..8ec49bb 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeFsPartition.java
@@ -16,18 +16,23 @@
// under the License.
package org.apache.impala.catalog;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.impala.analysis.LiteralExpr;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.THdfsPartitionLocation;
+import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartitionStats;
+import org.apache.impala.util.ListMap;
/**
* Frontend interface for interacting with a single filesystem-based partition.
@@ -50,6 +55,11 @@ public interface FeFsPartition {
FeFsTable getTable();
/**
+ * @return ListMap<hostIndex> from partition's table.
+ */
+ ListMap<TNetworkAddress> getHostIndex();
+
+ /**
* @return the FsType that this partition is stored on
*/
FileSystemUtil.FsType getFsType();
@@ -95,6 +105,13 @@ public interface FeFsPartition {
Path getLocationPath();
/**
+ * @return the FileSystem of this partition
+ */
+ default FileSystem getFileSystem(Configuration conf) throws IOException {
+ return getLocationPath().getFileSystem(conf);
+ }
+
+ /**
* @return the HDFS permissions Impala has to this partition's directory - READ_ONLY,
* READ_WRITE, etc.
*/
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 3d3b0c3..28f1f3d 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -682,7 +682,29 @@ public class HdfsPartition extends CatalogObjectImpl
// it's not used in coordinators.
private final InFlightEvents inFlightEvents_;
- private HdfsPartition(HdfsTable table, long id, long prevId, String partName,
+ /**
+ * Constructor. Needed for third party extensions that want to use their own builder
+ * to construct the object.
+ */
+ protected HdfsPartition(HdfsTable table, long prevId, String partName,
+ List<LiteralExpr> partitionKeyValues, HdfsStorageDescriptor fileFormatDescriptor,
+ @Nonnull ImmutableList<byte[]> encodedFileDescriptors,
+ ImmutableList<byte[]> encodedInsertFileDescriptors,
+ ImmutableList<byte[]> encodedDeleteFileDescriptors,
+ HdfsPartitionLocationCompressor.Location location,
+ boolean isMarkedCached, TAccessLevel accessLevel, Map<String, String> hmsParameters,
+ CachedHmsPartitionDescriptor cachedMsPartitionDescriptor,
+ byte[] partitionStats, boolean hasIncrementalStats, long numRows, long writeId,
+ InFlightEvents inFlightEvents) {
+ this(table, partitionIdCounter_.getAndIncrement(), prevId, partName,
+ partitionKeyValues, fileFormatDescriptor, encodedFileDescriptors,
+ encodedInsertFileDescriptors, encodedDeleteFileDescriptors, location,
+ isMarkedCached, accessLevel, hmsParameters, cachedMsPartitionDescriptor,
+ partitionStats, hasIncrementalStats, numRows, writeId, inFlightEvents);
+ }
+
+
+ protected HdfsPartition(HdfsTable table, long id, long prevId, String partName,
List<LiteralExpr> partitionKeyValues, HdfsStorageDescriptor fileFormatDescriptor,
@Nonnull ImmutableList<byte[]> encodedFileDescriptors,
ImmutableList<byte[]> encodedInsertFileDescriptors,
@@ -786,6 +808,9 @@ public class HdfsPartition extends CatalogObjectImpl
@Override // FeFsPartition
public HdfsTable getTable() { return table_; }
+ @Override // FeFsPartition
+ public ListMap<TNetworkAddress> getHostIndex() { return table_.getHostIndex(); }
+
@Override
public FileSystemUtil.FsType getFsType() {
Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
index 939bcf6..2ba6414 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartitionLocationCompressor.java
@@ -35,7 +35,7 @@ import com.google.common.base.Preconditions;
* prefixes, like table locations.
*
*/
-class HdfsPartitionLocationCompressor {
+public class HdfsPartitionLocationCompressor {
int numClusteringColumns_;
// A bi-directional map between partition location prefixes and their compressed
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
index 1bed832..575e01a 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsPartition.java
@@ -39,7 +39,9 @@ import org.apache.impala.common.FileSystemUtil;
import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.THdfsPartitionLocation;
+import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TPartitionStats;
+import org.apache.impala.util.ListMap;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -104,6 +106,9 @@ public class LocalFsPartition implements FeFsPartition {
return table_;
}
+ @Override // FeFsPartition
+ public ListMap<TNetworkAddress> getHostIndex() { return table_.getHostIndex(); }
+
@Override
public FileSystemUtil.FsType getFsType() {
Preconditions.checkNotNull(getLocationPath().toUri().getScheme(),
diff --git a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
index 31463cf..252df3b 100644
--- a/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/AnalyticEvalNode.java
@@ -251,7 +251,7 @@ public class AnalyticEvalNode extends PlanNode {
}
@Override
- protected void computeStats(Analyzer analyzer) {
+ public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
cardinality_ = getChild(0).cardinality_;
cardinality_ = capCardinalityAtLimit(cardinality_);
diff --git a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
index f7629a1..00cd67f 100644
--- a/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/CardinalityCheckNode.java
@@ -40,7 +40,7 @@ import com.google.common.base.Preconditions;
public class CardinalityCheckNode extends PlanNode {
private final String displayStatement_;
- protected CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) {
+ public CardinalityCheckNode(PlanNodeId id, PlanNode child, String displayStmt) {
super(id, "CARDINALITY CHECK");
Preconditions.checkState(child.getLimit() <= 2);
cardinality_ = 1;
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index bbff229..f22b835 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -1194,7 +1194,7 @@ public class HdfsScanNode extends ScanNode {
// Translate from the host index (local to the HdfsTable) to network address.
int replicaHostIdx = FileBlock.getReplicaHostIdx(block, j);
TNetworkAddress networkAddress =
- partition.getTable().getHostIndex().getEntry(replicaHostIdx);
+ partition.getHostIndex().getEntry(replicaHostIdx);
Preconditions.checkNotNull(networkAddress);
// Translate from network address to the global (to this request) host index.
Integer globalHostIdx = analyzer.getHostIndex().getIndex(networkAddress);
diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
index e4a2d0c..7e2d520 100644
--- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java
@@ -352,7 +352,7 @@ public class KuduScanNode extends ScanNode {
}
@Override
- protected void computeStats(Analyzer analyzer) {
+ public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
computeNumNodes(analyzer);
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanNode.java b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
index a91cb4d..52afb59 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanNode.java
@@ -590,7 +590,7 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
* from init() (to facilitate inserting additional nodes during plan
* partitioning w/o the need to call init() recursively on the whole tree again).
*/
- protected void computeStats(Analyzer analyzer) {
+ public void computeStats(Analyzer analyzer) {
avgRowSize_ = 0.0F;
for (TupleId tid: tupleIds_) {
TupleDescriptor desc = analyzer.getTupleDesc(tid);
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index e769fb0..bd4acb5 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -642,31 +642,37 @@ public class Planner {
return newJoinNode;
}
- private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
+ public static void checkForSmallQueryOptimization(PlanNode singleNodePlan,
+ PlannerContext ctx) {
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
singleNodePlan.accept(visitor);
if (!visitor.valid()) return;
// This optimization executes the plan on a single node so the threshold must
// be based on the total number of rows processed.
long maxRowsProcessed = visitor.getMaxRowsProcessed();
- int threshold = ctx_.getQueryOptions().exec_single_node_rows_threshold;
+ int threshold = ctx.getQueryOptions().exec_single_node_rows_threshold;
if (maxRowsProcessed < threshold) {
// Execute on a single node and disable codegen for small results
LOG.trace("Query is small enough to execute on a single node: maxRowsProcessed = "
+ maxRowsProcessed);
- ctx_.getQueryOptions().setNum_nodes(1);
- ctx_.getQueryCtx().disable_codegen_hint = true;
- if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
- maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0) {
+ ctx.getQueryOptions().setNum_nodes(1);
+ ctx.getQueryCtx().disable_codegen_hint = true;
+ if (maxRowsProcessed < ctx.getQueryOptions().batch_size ||
+ maxRowsProcessed < 1024 && ctx.getQueryOptions().batch_size == 0) {
// Only one scanner thread for small queries
- ctx_.getQueryOptions().setNum_scanner_threads(1);
+ ctx.getQueryOptions().setNum_scanner_threads(1);
}
// disable runtime filters
- ctx_.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
+ ctx.getQueryOptions().setRuntime_filter_mode(TRuntimeFilterMode.OFF);
}
}
- private void checkForDisableCodegen(PlanNode distributedPlan) {
+ private void checkForSmallQueryOptimization(PlanNode singleNodePlan) {
+ checkForSmallQueryOptimization(singleNodePlan, ctx_);
+ }
+
+ public static void checkForDisableCodegen(PlanNode distributedPlan,
+ PlannerContext ctx) {
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
distributedPlan.accept(visitor);
if (!visitor.valid()) return;
@@ -674,11 +680,15 @@ public class Planner {
// reduce per-node execution time enough to justify the cost of codegen. Per-node
// execution time is correlated with the number of rows flowing through the plan.
if (visitor.getMaxRowsProcessedPerNode()
- < ctx_.getQueryOptions().getDisable_codegen_rows_threshold()) {
- ctx_.getQueryCtx().disable_codegen_hint = true;
+ < ctx.getQueryOptions().getDisable_codegen_rows_threshold()) {
+ ctx.getQueryCtx().disable_codegen_hint = true;
}
}
+ private void checkForDisableCodegen(PlanNode distributedPlan) {
+ checkForDisableCodegen(distributedPlan, ctx_);
+ }
+
/**
* Insert a sort node on top of the plan, depending on the clustered/noclustered
* plan hint and on the 'sort.columns' table property. If clustering is enabled in
diff --git a/fe/src/main/java/org/apache/impala/planner/SortNode.java b/fe/src/main/java/org/apache/impala/planner/SortNode.java
index 79913ed..225c372 100644
--- a/fe/src/main/java/org/apache/impala/planner/SortNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SortNode.java
@@ -293,7 +293,7 @@ public class SortNode extends PlanNode {
}
@Override
- protected void computeStats(Analyzer analyzer) {
+ public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
if (isTypeTopN() && includeTies_) {
cardinality_ = capCardinalityAtLimit(getChild(0).cardinality_, limitWithTies_);
diff --git a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
index f964c6d..eb57df6 100644
--- a/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/SubplanNode.java
@@ -85,7 +85,7 @@ public class SubplanNode extends PlanNode {
}
@Override
- protected void computeStats(Analyzer analyzer) {
+ public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
if (getChild(0).cardinality_ != -1 && getChild(1).cardinality_ != -1) {
cardinality_ =