You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/09/12 14:57:26 UTC

[carbondata] branch master updated: [CARBONDATA-3967] cache partition on select to enable faster pruning

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6396ce8  [CARBONDATA-3967] cache partition on select to enable faster pruning
6396ce8 is described below

commit 6396ce81ae30b38f8eddea00c0c15068c36558b3
Author: kunal642 <ku...@gmail.com>
AuthorDate: Thu Aug 27 14:57:04 2020 +0530

    [CARBONDATA-3967] cache partition on select to enable faster pruning
    
    Why is this PR needed?
    spark's getPartition method can sometime take a lot of time which can degrade the performance for the queries.
    
    What changes were proposed in this PR?
    Cache the partition on each load and query so that the subsequent queries can perform faster. Caching is done under the expiring map interface so the cache expiration time can work for this also.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3908
---
 .../core/constants/CarbonCommonConstants.java      |   7 +
 docs/configuration-parameters.md                   |   1 +
 .../spark/sql/hive/CarbonSessionCatalog.scala      |   7 +-
 .../spark/sql/hive/CarbonSessionCatalogUtil.scala  |   7 +-
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  |  20 ++-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |   8 +-
 .../apache/spark/util/PartitionCacheManager.scala  | 181 +++++++++++++++++++++
 .../spark/sql/hive/CarbonSessionStateBuilder.scala |   7 +-
 .../spark/sql/hive/CarbonSessionStateBuilder.scala |   9 +-
 .../StandardPartitionTableLoadingTestCase.scala    |  40 +++++
 10 files changed, 261 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index c498c20..d34f357 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1274,6 +1274,13 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MAX_DRIVER_LRU_CACHE_SIZE = "carbon.max.driver.lru.cache.size";
 
   /**
+   * max driver lru cache size upto which partition lru cache will be loaded in memory
+   */
+  @CarbonProperty
+  public static final String CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE =
+      "carbon.partition.max.driver.lru.cache.size";
+
+  /**
    * max executor lru cache size upto which lru cache will be loaded in memory
    */
   @CarbonProperty
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 38165fd..70fc3f3 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -146,6 +146,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.secondary.index.creation.threads | 1 | Specifies the number of threads to concurrently process segments during secondary index creation. This property helps fine tuning the system when there are a lot of segments in a table. The value range is 1 to 50. |
 | carbon.si.lookup.partialstring | true | When true, it includes starts with, ends with and contains. When false, it includes only starts with secondary indexes. |
 | carbon.max.pagination.lru.cache.size.in.mb | -1 | Maximum memory **(in MB)** upto which the SDK pagination reader can cache the blocklet rows. Suggest to configure as multiple of blocklet size. Default value of -1 means there is no memory limit for caching. Only integer values greater than 0 are accepted. |
+| carbon.partition.max.driver.lru.cache.size | -1 | Maximum memory **(in MB)** upto which driver can cache partition metadata. Beyond this, least recently used data will be removed from cache before loading new set of values.
 
 ## Data Mutation Configuration
 | Parameter | Default Value | Description |
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
index 7351b5f..aa80dd4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalog.scala
@@ -23,7 +23,8 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Expression
 
 import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
-import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema => ColumnSchema}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
 /**
  * This interface defines those common api used by carbon for spark-2.1 and spark-2.2 integration,
@@ -55,11 +56,11 @@ trait CarbonSessionCatalog {
    *
    * @param partitionFilters
    * @param sparkSession
-   * @param identifier
+   * @param carbonTable
    * @return
    */
   def getPartitionsAlternate(partitionFilters: Seq[Expression], sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition]
+      carbonTable: CarbonTable): Seq[CatalogTablePartition]
 
   /**
    * Update the storage format with new location information
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
index beaebed..7205f67 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionCatalogUtil.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
 object CarbonSessionCatalogUtil {
@@ -132,14 +133,14 @@ object CarbonSessionCatalogUtil {
    * hive and then apply filter instead of querying hive along with filters.
    * @param partitionFilters
    * @param sparkSession
-   * @param identifier
+   * @param carbonTable
    * @return
    */
   def getPartitionsAlternate(
       partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionUtil.prunePartitionsByFilter(partitionFilters, sparkSession, identifier)
+      carbonTable: CarbonTable): Seq[CatalogTablePartition] = {
+    CarbonSessionUtil.pruneAndCachePartitionsByFilters(partitionFilters, sparkSession, carbonTable)
   }
 
   /**
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index a6d3626..2d3af66 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -17,20 +17,24 @@
 
 package org.apache.spark.sql.hive
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSource, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, ExternalCatalogUtils}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.{SparkSQLUtil, SparkTypeConverter}
-import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.spark.util.{CarbonReflectionUtils, PartitionCacheKey, PartitionCacheManager}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.util.CarbonUtil
 
 /**
  * This class refresh the relation from cache if the carbon table in
@@ -103,15 +107,17 @@ object CarbonSessionUtil {
    *
    * @param partitionFilters
    * @param sparkSession
-   * @param identifier
+   * @param carbonTable
    * @return
    */
-  def prunePartitionsByFilter(partitionFilters: Seq[Expression],
+  def pruneAndCachePartitionsByFilters(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+      carbonTable: CarbonTable): Seq[CatalogTablePartition] = {
+    val allPartitions = PartitionCacheManager.get(PartitionCacheKey(carbonTable.getTableId,
+      carbonTable.getTablePath, CarbonUtil.getExpiration_time(carbonTable))).asScala
     ExternalCatalogUtils.prunePartitionsByFilter(
-      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      sparkSession.sessionState.catalog.getTableMetadata(TableIdentifier(carbonTable.getTableName,
+        Some(carbonTable.getDatabaseName))),
       allPartitions,
       partitionFilters,
       sparkSession.sessionState.conf.sessionLocalTimeZone
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index e8a8c3a..f4e89f3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -523,10 +523,6 @@ object CarbonFilters {
 
   /**
    * Fetches partition information from hive
-   * @param partitionFilters
-   * @param sparkSession
-   * @param carbonTable
-   * @return
    */
   def getPartitions(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
@@ -547,7 +543,7 @@ object CarbonFilters {
           CarbonSessionCatalogUtil.getPartitionsAlternate(
             partitionFilters,
             sparkSession,
-            identifier)
+            carbonTable)
         }
       } catch {
         case e: Exception =>
@@ -555,7 +551,7 @@ object CarbonFilters {
           CarbonSessionCatalogUtil.getPartitionsAlternate(
             partitionFilters,
             sparkSession,
-            identifier)
+            carbonTable)
       }
     }
     Some(partitions.map { partition =>
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
new file mode 100644
index 0000000..411cbe2
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/util/PartitionCacheManager.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.net.URI
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.log4j.Logger
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.cache.{Cache, Cacheable, CarbonLRUCache}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.index.Segment
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object PartitionCacheManager extends Cache[PartitionCacheKey,
+  java.util.List[CatalogTablePartition]] {
+
+  private val CACHE = new CarbonLRUCache(
+    CarbonCommonConstants.CARBON_PARTITION_MAX_DRIVER_LRU_CACHE_SIZE,
+    CarbonCommonConstants.CARBON_MAX_LRU_CACHE_SIZE_DEFAULT)
+
+  val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def get(identifier: PartitionCacheKey): java.util.List[CatalogTablePartition] = {
+    LOGGER.info("Reading partition values from store")
+    // read the tableStatus file to get valid and invalid segments
+    val validInvalidSegments = new SegmentStatusManager(AbsoluteTableIdentifier.from(
+      identifier.tablePath, null, null, identifier.tableId))
+      .getValidAndInvalidSegments
+    val existingCache = CACHE.get(identifier.tableId)
+    val cacheablePartitionSpecs = validInvalidSegments.getValidSegments.asScala.map { segment =>
+      val segmentFileName = segment.getSegmentFileName
+      val segmentFilePath = FileFactory.getCarbonFile(
+        CarbonTablePath.getSegmentFilePath(identifier.tablePath, segmentFileName))
+      // read the last modified time
+      val segmentFileModifiedTime = segmentFilePath.getLastModifiedTime
+      if (existingCache != null) {
+        val segmentCache = existingCache.asInstanceOf[CacheablePartitionSpec]
+          .partitionSpecs.get(segment.getSegmentNo)
+        segmentCache match {
+          case Some(c) =>
+            // check if cache needs to be updated
+            if (segmentFileModifiedTime > c._2) {
+              (segment.getSegmentNo, (readPartition(identifier,
+                segmentFilePath.getAbsolutePath), segmentFileModifiedTime))
+            } else {
+              (segment.getSegmentNo, c)
+            }
+          case None =>
+            (segment.getSegmentNo, (readPartition(identifier,
+              segmentFilePath.getAbsolutePath), segmentFileModifiedTime))
+        }
+      } else {
+        // read the partitions if not available in cache.
+        (segment.getSegmentNo, (readPartition(identifier,
+          segmentFilePath.getAbsolutePath), segmentFileModifiedTime))
+      }
+    }.toMap
+    // remove all invalid segment entries from cache
+    val finalCache = cacheablePartitionSpecs --
+                     validInvalidSegments.getInvalidSegments.asScala.map(_.getSegmentNo)
+    val cacheObject = CacheablePartitionSpec(finalCache)
+    if (finalCache.nonEmpty) {
+      // remove the existing cache as new cache values may be added.
+      // CarbonLRUCache does not allow cache updation until time is expired.
+      // TODO: Need to fix!!
+      CACHE.remove(identifier.tableId)
+      CACHE.put(identifier.tableId,
+        cacheObject,
+        cacheObject.getMemorySize,
+        identifier.expirationTime)
+    }
+    finalCache.values.flatMap(_._1).toList.asJava
+  }
+
+  override def getAll(keys: util.List[PartitionCacheKey]):
+  util.List[util.List[CatalogTablePartition]] = {
+    keys.asScala.toList.map(get).asJava
+  }
+
+  override def getIfPresent(key: PartitionCacheKey): java.util.List[CatalogTablePartition] = {
+    CACHE.get(key.tableId).asInstanceOf[CacheablePartitionSpec].partitionSpecs.values.flatMap(_._1)
+      .toList.asJava
+  }
+
+  override def invalidate(partitionCacheKey: PartitionCacheKey): Unit = {
+    CACHE.remove(partitionCacheKey.tableId)
+  }
+
+  private def readPartition(identifier: PartitionCacheKey, segmentFilePath: String) = {
+    val segmentFile = SegmentFileStore.readSegmentFile(segmentFilePath)
+    segmentFile.getLocationMap.values().asScala
+      .flatMap(_.getPartitions.asScala).toSet.map { uniquePartition: String =>
+      val partitionSplit = uniquePartition.split("=")
+      val storageFormat = CatalogStorageFormat(
+        Some(new URI(identifier.tablePath + "/" + uniquePartition)),
+        None, None, None, compressed = false, Map())
+      CatalogTablePartition(Map(partitionSplit(0) -> partitionSplit(1)), storageFormat)
+    }.toSeq
+  }
+
+  override def put(key: PartitionCacheKey,
+      value: java.util.List[CatalogTablePartition]): Unit = {
+
+  }
+
+  override def clearAccessCount(keys: util.List[PartitionCacheKey]): Unit = {
+
+  }
+}
+
+case class PartitionCacheKey(tableId: String, tablePath: String, expirationTime: Long)
+
+/**
+ * Cacheable instance of the CatalogTablePartitions.
+ *
+ * Maintains a mapping of segmentNo -> (Seq[CatalogTablePartition], lastModifiedTime)
+ */
+case class CacheablePartitionSpec(partitionSpecs: Map[String, (Seq[CatalogTablePartition], Long)])
+  extends Cacheable {
+
+  /**
+   * This method will return the access count for a column based on which decision will be taken
+   * whether to keep the object in memory
+   *
+   * @return
+   */
+  override def getAccessCount: Int = {
+    0
+  }
+
+  /**
+   * This method will return the memory size of the cached partition
+   *
+   * @return
+   */
+  override def getMemorySize: Long = {
+    partitionSpecs.values.flatMap {
+      partitionSpec =>
+        partitionSpec._1.map { specs =>
+          if (specs.stats.isDefined) {
+            specs.stats.get.sizeInBytes.toLong
+          } else {
+            SizeEstimator.estimate(specs)
+          }
+        }
+    }.sum
+  }
+
+  /**
+   * Method to be used for invalidating the cacheable object. API to be invoked at the time of
+   * removing the cacheable object from memory. Example at removing the cacheable object
+   * from LRU cache
+   */
+  override def invalidate(): Unit = {
+
+  }
+}
diff --git a/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala b/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index 8a9f8f3..44e556a 100644
--- a/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
+++ b/integration/spark/src/main/spark2.3/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
 /**
@@ -117,12 +118,12 @@ class CarbonHiveSessionCatalog(
    * hive and then apply filter instead of querying hive along with filters.
    * @param partitionFilters
    * @param sparkSession
-   * @param identifier
+   * @param carbonTable
    * @return
    */
   override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession, identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, sparkSession, identifier)
+      sparkSession: SparkSession, carbonTable: CarbonTable): Seq[CatalogTablePartition] = {
+    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, sparkSession, carbonTable)
   }
 
   /**
diff --git a/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala b/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
index 92c65bb..28b1e72 100644
--- a/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
+++ b/integration/spark/src/main/spark2.4/org/apache/spark/sql/hive/CarbonSessionStateBuilder.scala
@@ -30,10 +30,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.hive.client.HiveClient
-import org.apache.spark.sql.internal.{SessionState, SQLConf}
+import org.apache.spark.sql.internal.{SQLConf, SessionState}
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule}
 import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 
 /**
@@ -117,12 +118,12 @@ class CarbonHiveSessionCatalog(
    * hive and then apply filter instead of querying hive along with filters.
    * @param partitionFilters
    * @param sparkSession
-   * @param identifier
+   * @param carbonTable
    * @return
    */
   override def getPartitionsAlternate(partitionFilters: Seq[Expression],
-      sparkSession: SparkSession, identifier: TableIdentifier): Seq[CatalogTablePartition] = {
-    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, sparkSession, identifier)
+      sparkSession: SparkSession, carbonTable: CarbonTable): Seq[CatalogTablePartition] = {
+    CarbonSessionCatalogUtil.getPartitionsAlternate(partitionFilters, sparkSession, carbonTable)
   }
 
   /**
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 5e2acaa..a84cbee 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -22,11 +22,13 @@ import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.util.{PartitionCacheKey, PartitionCacheManager}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.Strings
@@ -585,6 +587,43 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     }
   }
 
+  test("test partition caching") {
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false")
+    sql("drop table if exists partition_cache")
+    sql("create table partition_cache(a string) partitioned by(b int) stored as carbondata")
+    sql("insert into partition_cache select 'k',1")
+    sql("select * from partition_cache where b = 1").collect()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "partition_cache")
+    var partitionSpecs: util.List[CatalogTablePartition] = PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 1)
+    sql("insert into partition_cache select 'k',2")
+    sql("select * from partition_cache where b = 2").collect()
+    sql("select * from partition_cache where b = 2").collect()
+    partitionSpecs = PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 2)
+    sql("delete from table partition_cache where segment.id in (1)")
+    sql("select * from partition_cache where b = 2").collect()
+    partitionSpecs = PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 1)
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
+  }
+
+  test("test partition caching after load") {
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "false")
+    sql("drop table if exists partition_cache")
+    sql("create table partition_cache(a string) partitioned by(b int) stored as carbondata")
+    sql("insert into partition_cache select 'k',1")
+    sql("select * from partition_cache where b = 1").collect()
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "partition_cache")
+    val partitionSpecs = PartitionCacheManager.getIfPresent(
+      PartitionCacheKey(carbonTable.getTableId, carbonTable.getTablePath, 1L))
+    assert(partitionSpecs.size == 1)
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
+  }
+
   def verifyInsertForPartitionTable(tableName: String, sort_scope: String): Unit = {
     sql(s"drop table if exists $tableName")
     sql(
@@ -661,6 +700,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
 
 
   override def afterAll = {
+    CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct", "true")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)