You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2021/02/07 14:24:00 UTC
[kylin] branch kylin-on-parquet-v2 updated: KYLIN-4892 Reduce the
times of fetching files status from HDFS Namenode in FilePruner
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin-on-parquet-v2 by this push:
new b9cd81a KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner
b9cd81a is described below
commit b9cd81a0b1710913af469b04d44f3451d6a87f0c
Author: Zhichao Zhang <44...@qq.com>
AuthorDate: Mon Feb 1 16:57:02 2021 +0800
KYLIN-4892 Reduce the times of fetching files status from HDFS Namenode in FilePruner
(cherry picked from commit 6e4d94d1c027d5877eb3013f37ef223aa0532cc2)
(cherry picked from commit edebb98ca33e1f3ddf000842f12d0bff45109c57)
---
.../sql/execution/datasource/FilePruner.scala | 109 +++++++++++----------
.../org/apache/spark/sql/SparderContext.scala | 5 +-
.../engine/spark2/NBadQueryAndPushDownTest.java | 2 +-
3 files changed, 62 insertions(+), 54 deletions(-)
diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
index 9f62f52..f0f7916 100644
--- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
+++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala
@@ -51,10 +51,9 @@ case class SegmentDirectory(segmentName: String, identifier: String, files: Seq[
* @param shardColumnNames the names of the columns that used to generate the shard id.
* @param sortColumnNames the names of the columns that used to sort data in each shard.
*/
-case class ShardSpec(
- numShards: Int,
- shardColumnNames: Seq[String],
- sortColumnNames: Seq[String]) {
+case class ShardSpec(numShards: Int,
+ shardColumnNames: Seq[String],
+ sortColumnNames: Seq[String]) {
if (numShards <= 0) {
throw new AnalysisException(
@@ -72,28 +71,17 @@ case class ShardSpec(
}
}
-class FilePruner(
- cubeInstance: CubeInstance,
- cuboid: Cuboid,
- val session: SparkSession,
- val options: Map[String, String])
+class FilePruner(cubeInstance: CubeInstance,
+ cuboid: Cuboid,
+ val session: SparkSession,
+ val options: Map[String, String])
extends FileIndex with ResetShufflePartition with Logging {
private lazy val segmentDirs: Seq[SegmentDirectory] = {
cubeInstance.getSegments.asScala
.filter(_.getStatus.equals(SegmentStatusEnum.READY)).map(seg => {
- val segName = seg.getName
- val path = PathManager.getParquetStoragePath(cubeInstance, segName, seg.getStorageLocationIdentifier, layoutEntity.getId)
- val files = new InMemoryFileIndex(session,
- Seq(new Path(path)),
- options,
- Some(dataSchema),
- FileStatusCache.getOrCreate(session))
- .listFiles(Nil, Nil)
- .flatMap(_.files)
- .filter(_.isFile)
- SegmentDirectory(segName, seg.getStorageLocationIdentifier, files)
- }).filter(_.files.nonEmpty)
+ SegmentDirectory(seg.getName, seg.getStorageLocationIdentifier, Nil)
+ })
}
val layoutEntity = MetadataConverter.toLayoutEntity(cubeInstance, cuboid)
@@ -103,7 +91,8 @@ class FilePruner(
.map { column => StructField(column.id.toString, column.dataType) }
.toSeq ++
layoutEntity.getOrderedMeasures.asScala
- .map { entry => StructField(entry._1.toString, SparkTypeUtil.generateFunctionReturnDataType(entry._2)) }
+ .map { entry =>
+ StructField(entry._1.toString, SparkTypeUtil.generateFunctionReturnDataType(entry._2)) }
.toSeq)
}
@@ -115,7 +104,6 @@ class FilePruner(
PathManager.getParquetStoragePath(cubeInstance, segmentName, identifier, cuboid.getId)
}
-
override lazy val partitionSchema: StructType = {
// we did not use the partitionBy mechanism of spark
new StructType()
@@ -130,7 +118,8 @@ class FilePruner(
val ref = desc.getPartitionDateColumnRef
// only consider partition date column
// we can only get col ID in layout cuz data schema is all ids.
- val id = layoutEntity.getOrderedDimensions.asScala.values.find(column => column.columnName.equals(ref.getName))
+ val id = layoutEntity.getOrderedDimensions.asScala.values.find(
+ column => column.columnName.equals(ref.getName))
if (id.isDefined && (ref.getType.isDateTimeFamily || ref.getType.isStringFamily)) {
pattern = desc.getPartitionDateFormat
dataSchema.filter(_.name == String.valueOf(id.get.id))
@@ -208,12 +197,32 @@ class FilePruner(
var cached = new java.util.HashMap[(Seq[Expression], Seq[Expression]), Seq[PartitionDirectory]]()
- override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+ private def getFileStatusBySeg(seg: SegmentDirectory, fsc: FileStatusCache): SegmentDirectory = {
+ val path = new Path(toPath(seg.segmentName, seg.identifier))
+ val fs = path.getFileSystem(session.sparkContext.hadoopConfiguration)
+ if (fs.isDirectory(path) && fs.exists(path)) {
+ val maybeStatuses = fsc.getLeafFiles(path)
+ if (maybeStatuses.isDefined) {
+ SegmentDirectory(seg.segmentName, seg.identifier, maybeStatuses.get)
+ } else {
+ val statuses = fs.listStatus(path)
+ fsc.putLeafFiles(path, statuses)
+ SegmentDirectory(seg.segmentName, seg.identifier, statuses)
+ }
+ } else {
+ logWarning(s"Segment path ${path.toString} not exists.")
+ SegmentDirectory(seg.segmentName, seg.identifier, Nil)
+ }
+ }
+
+ override def listFiles(partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
if (cached.containsKey((partitionFilters, dataFilters))) {
return cached.get((partitionFilters, dataFilters))
}
require(isResolved)
+ val startTime = System.nanoTime
val timePartitionFilters = getSpecFilter(dataFilters, timePartitionColumn)
logInfo(s"Applying time partition filters: ${timePartitionFilters.mkString(",")}")
@@ -223,19 +232,12 @@ class FilePruner(
var selected = afterPruning("segment", timePartitionFilters, segmentDirs) {
pruneSegments
}
- // QueryContextFacade.current().record("seg_pruning")
- selected = selected.par.map { e =>
- val path = new Path(toPath(e.segmentName, e.identifier))
- val maybeStatuses = fsc.getLeafFiles(path)
- if (maybeStatuses.isDefined) {
- SegmentDirectory(e.segmentName, e.identifier, maybeStatuses.get)
- } else {
- val statuses = path.getFileSystem(session.sparkContext.hadoopConfiguration).listStatus(path)
- fsc.putLeafFiles(path, statuses)
- SegmentDirectory(e.segmentName, e.identifier, statuses)
- }
- }.toIterator.toSeq
- // QueryContextFacade.current().record("fetch_file_status")
+
+ // fetch segment directories info in parallel
+ selected = selected.par.map(seg => {
+ getFileStatusBySeg(seg, fsc)
+ }).filter(_.files.nonEmpty).seq
+
// shards pruning
selected = afterPruning("shard", dataFilters, selected) {
pruneShards
@@ -246,6 +248,7 @@ class FilePruner(
val totalFileSize = selected.flatMap(partition => partition.files).map(_.getLen).sum
logInfo(s"totalFileSize is ${totalFileSize}")
setShufflePartitions(totalFileSize, session)
+ logInfo(s"Files pruning in ${(System.nanoTime() - startTime).toDouble / 1000000} ms")
if (selected.isEmpty) {
val value = Seq.empty[PartitionDirectory]
cached.put((partitionFilters, dataFilters), value)
@@ -255,11 +258,12 @@ class FilePruner(
cached.put((partitionFilters, dataFilters), value)
value
}
-
}
- private def afterPruning(pruningType: String, specFilters: Seq[Expression], inputs: Seq[SegmentDirectory])
- (pruningFunc: (Seq[Expression], Seq[SegmentDirectory]) => Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+ private def afterPruning(pruningType: String, specFilters: Seq[Expression],
+ inputs: Seq[SegmentDirectory])
+ (pruningFunc: (Seq[Expression], Seq[SegmentDirectory]) =>
+ Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
if (specFilters.isEmpty) {
inputs
} else {
@@ -281,9 +285,8 @@ class FilePruner(
dataFilters.filter(_.references.subsetOf(AttributeSet(col)))
}
- private def pruneSegments(
- filters: Seq[Expression],
- segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+ private def pruneSegments(filters: Seq[Expression],
+ segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
val filteredStatuses = if (filters.isEmpty) {
segDirs
@@ -298,7 +301,8 @@ class FilePruner(
val pruned = segDirs.filter {
e => {
val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
- SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
+ SegFilters(tsRange.startValue, tsRange.endValue, pattern)
+ .foldFilter(reducedFilter) match {
case Trivial(true) => true
case Trivial(false) => false
}
@@ -312,7 +316,7 @@ class FilePruner(
}
private def pruneShards(filters: Seq[Expression],
- segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
+ segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
segDirs
} else {
@@ -323,7 +327,8 @@ class FilePruner(
val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
- val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
+ val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name,
+ partitionNumber)
val selected = files.filter(f => {
val partitionId = FilePruner.getPartitionId(f.getPath)
@@ -331,7 +336,8 @@ class FilePruner(
})
SegmentDirectory(segName, segIdentifier, selected)
}
- logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
+ logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files)
+ .map(_.getPath.toString).mkString(";"))
pruned
}
filteredStatuses
@@ -348,10 +354,9 @@ class FilePruner(
override def refresh(): Unit = {}
- private def getExpressionShards(
- expr: Expression,
- shardColumnName: String,
- numShards: Int): BitSet = {
+ private def getExpressionShards(expr: Expression,
+ shardColumnName: String,
+ numShards: Int): BitSet = {
def getShardNumber(attr: Attribute, v: Any): Int = {
BucketingUtils.getBucketIdFromValue(attr, numShards, v)
diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
index c3d089d..aaec4d5 100644
--- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
+++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/spark/sql/SparderContext.scala
@@ -36,7 +36,7 @@ import org.apache.kylin.common.KylinConfig
import org.apache.kylin.query.monitor.SparderContextCanary
import org.apache.kylin.spark.classloader.ClassLoaderUtils
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
-import org.apache.spark.sql.execution.datasource.KylinSourceStrategy
+import org.apache.spark.sql.execution.datasource.{KylinSourceStrategy, ShardFileStatusCache}
import org.apache.spark.sql.metrics.SparderMetricsListener
import org.apache.spark.utils.YarnInfoFetcherUtils
@@ -202,6 +202,9 @@ object SparderContext extends Logging {
//monitor sparder
SparderContextCanary.init()
}
+
+ // init FileStatusCache
+ ShardFileStatusCache.getFileStatusCache(getOriginalSparkSession)
}
}
diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java
index 3e50415..744922a 100644
--- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java
+++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBadQueryAndPushDownTest.java
@@ -92,7 +92,7 @@ public class NBadQueryAndPushDownTest extends LocalWithSparkSessionTest {
@Test
public void testPushDownForFileNotExist() throws Exception {
- final String sql = "select max(price) from test_kylin_fact";
+ final String sql = "select max(ITEM_COUNT) from test_kylin_fact";
KylinConfig.getInstanceFromEnv().setProperty(PUSHDOWN_RUNNER_KEY,
"org.apache.kylin.query.pushdown.PushDownRunnerSparkImpl");
try {