You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/05/13 05:15:47 UTC

[carbondata] branch master updated: [CARBONDATA-3801][CARBONDATA-3805][CARBONDATA-3809] Query on partition table with SI having multiple partition columns gives empty results

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccaf47  [CARBONDATA-3801][CARBONDATA-3805][CARBONDATA-3809] Query on partition table with SI having multiple partition columns gives empty results
5ccaf47 is described below

commit 5ccaf4703b99a9903723df57d4629fe6e135e25c
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed May 6 16:15:38 2020 +0530

    [CARBONDATA-3801][CARBONDATA-3805][CARBONDATA-3809] Query on partition table with SI having multiple partition
    columns gives empty results
    
    Why is this PR needed?
    1. Query on partition table with SI having multiple partition columns gives empty results. Because while loading data to SI table,
    blockid -> partition directory path is replaced by '#' instead of '/' to support multi level partitioning. Hence block id will be
    like part1=1#part2=2/xxxxxxxxx. During query the above block id is compared with actual block id part1=1/part2=2/xxxxxxxxx,
    which do not match, and provides empty results.
    2. Drop Index throws exception while starting beeline, that error while editing schema file
    3. Refresh syntax in SI doc is wrong.
    
    What changes were proposed in this PR?
    1. During query, convert block id as in Load flow in case of partition table, to support multi level partitioning.
    2. Remove modifying schema mdt file in index flow.
    3. Updated Refresh syntax
    
    This closes #3748
---
 .../core/indexstore/blockletindex/BlockIndex.java  | 12 +++++--
 docs/index/index-management.md                     |  6 ++++
 docs/index/secondary-index-guide.md                |  4 +--
 .../secondaryindex/TestSIWithPartition.scala       | 23 ++++++++++++
 .../sql/hive/CarbonHiveIndexMetadataUtil.scala     |  4 ---
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  4 +--
 .../sql/secondaryindex/util/FileInternalUtil.scala | 41 ----------------------
 7 files changed, 42 insertions(+), 52 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
index 170d383..3ab3cd4 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockIndex.java
@@ -63,6 +63,7 @@ import org.apache.carbondata.core.scan.filter.executer.ImplicitColumnFilterExecu
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.util.BlockletIndexUtil;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
@@ -793,9 +794,14 @@ public class BlockIndex extends CoarseGrainIndex
     BitSet bitSet = null;
     if (filterExecuter instanceof ImplicitColumnFilterExecutor) {
       String uniqueBlockPath;
-      if (segmentPropertiesWrapper.getCarbonTable().isHivePartitionTable()) {
-        uniqueBlockPath = filePath
-            .substring(segmentPropertiesWrapper.getCarbonTable().getTablePath().length() + 1);
+      CarbonTable carbonTable = segmentPropertiesWrapper.getCarbonTable();
+      if (carbonTable.isHivePartitionTable()) {
+        // While data loading to SI created on Partition table, on partition directory, '/' will be
+        // replaced with '#', to support multi level partitioning. For example, BlockId will be
+        // look like `part1=1#part2=2/xxxxxxxxx`. During query also, blockId should be
+        // replaced by '#' in place of '/', to match and prune data on SI table.
+        uniqueBlockPath = CarbonUtil
+            .getBlockId(carbonTable.getAbsoluteTableIdentifier(), filePath, "", true, false, true);
       } else {
         uniqueBlockPath = filePath.substring(filePath.lastIndexOf("/Part") + 1);
       }
diff --git a/docs/index/index-management.md b/docs/index/index-management.md
index 7bd9c75..b56f389 100644
--- a/docs/index/index-management.md
+++ b/docs/index/index-management.md
@@ -39,6 +39,12 @@ AS carbondata/bloomfilter/lucene
 [PROPERTIES ('key'='value')]
 ```
 
+Index can be refreshed using following DDL
+
+```
+REFRESH INDEX index_name ON [TABLE] [db_name.]table_name
+```
+
 Currently, there are 3 Index implementations in CarbonData.
 
 | Index Provider   | Description                                                                      | Management |
diff --git a/docs/index/secondary-index-guide.md b/docs/index/secondary-index-guide.md
index 1d86b82..cf22097 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -142,12 +142,12 @@ Where there are so many small files present in the SI table, then we can use the
 compact the files within an SI segment to avoid many small files.
 
   ```
-  REFRESH INDEX sales_index
+  REFRESH INDEX sales_index ON TABLE sales
   ```
 This command merges data files in each segment of the SI table.
 
   ```
-  REFRESH INDEX sales_index WHERE SEGMENT.ID IN(1)
+  REFRESH INDEX sales_index ON TABLE sales WHERE SEGMENT.ID IN(1)
   ```
 This command merges data files within a specified segment of the SI table.
 
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
index 3581211..31bd4a2 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithPartition.scala
@@ -356,8 +356,31 @@ class TestSIWithPartition extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test secondary index with partition table having mutiple partition columns") {
+    sql("drop table if exists partition_table")
+    sql(s"""
+         | CREATE TABLE partition_table (stringField string, intField int, shortField short, stringField1 string)
+         | STORED AS carbondata
+         | PARTITIONED BY (hour_ string, date_ string, sec_ string)
+         | TBLPROPERTIES ('SORT_COLUMNS'='hour_,date_,stringField', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"drop index if exists si_on_multi_part on partition_table")
+    sql(s"create index si_on_multi_part on partition_table(stringField1) as 'carbondata'")
+    sql("insert into partition_table select 'abc', 1,123,'abc1',2,'mon','ten'")
+    checkAnswer(sql(s"select count(*) from si_on_multi_part"), Seq(Row(1)))
+    val dataFrame = sql(s"select stringField,date_,sec_ from partition_table where stringField1='abc1'")
+    checkAnswer(dataFrame, Seq(Row("abc","mon","ten")))
+    if (!isFilterPushedDownToSI(dataFrame.queryExecution.sparkPlan)) {
+      assert(false)
+    } else {
+      assert(true)
+    }
+    sql("drop table if exists partition_table")
+  }
+
   override protected def afterAll(): Unit = {
     sql("drop index if exists indextable1 on uniqdata1")
     sql("drop table if exists uniqdata1")
+    sql("drop table if exists partition_table")
   }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
index 57fb05e..a7871fb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonHiveIndexMetadataUtil.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.index.CarbonIndexUtil
-import org.apache.spark.sql.secondaryindex.util.FileInternalUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -106,9 +105,6 @@ object CarbonHiveIndexMetadataUtil {
     sparkSession.sql(
       s"""ALTER TABLE $dbName.$parentTableName SET SERDEPROPERTIES ('indexInfo'='$newIndexInfo')
         """.stripMargin).collect()
-    FileInternalUtil.touchSchemaFileTimestamp(dbName, parentTableName,
-      parentCarbonTable.getTablePath, System.currentTimeMillis())
-    FileInternalUtil.touchStoreTimeStamp()
     refreshTable(dbName, parentTableName, sparkSession)
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index f346ea1..6213caf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -99,7 +99,7 @@ object CarbonIndexUtil {
   def getCGAndFGIndexes(carbonTable: CarbonTable): java.util.Map[String,
     util.Map[String, util.Map[String, String]]] = {
     val indexMetadata = carbonTable.getIndexMetadata
-    val cgAndFgIndexes = if (null != indexMetadata) {
+    val cgAndFgIndexes = if (null != indexMetadata && null != indexMetadata.getIndexesMap) {
       val indexesMap = indexMetadata.getIndexesMap
       indexesMap.asScala.filter(provider =>
         !provider._1.equalsIgnoreCase(IndexType.SI.getIndexProviderName)).asJava
@@ -236,7 +236,7 @@ object CarbonIndexUtil {
   def getIndexCarbonTables(carbonTable: CarbonTable,
       sparkSession: SparkSession): Seq[CarbonTable] = {
     val indexMetadata = carbonTable.getIndexMetadata
-    val siIndexesMap = if (null != indexMetadata) {
+    val siIndexesMap = if (null != indexMetadata && null != indexMetadata.getIndexesMap) {
       indexMetadata.getIndexesMap.get(IndexType.SI.getIndexProviderName)
     } else {
       new util.HashMap[String, util.Map[String, util.Map[String, String]]]()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
index 1be3b3d..46c318e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/FileInternalUtil.scala
@@ -23,14 +23,10 @@ import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.index.CarbonIndexUtil
 
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
@@ -39,30 +35,6 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
  */
 object FileInternalUtil {
 
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * This method will check and create an empty schema timestamp file
-   *
-   * @return
-   */
-  def touchStoreTimeStamp(): Long = {
-    val timestampFile = getTimestampFileAndType()
-    val systemTime = System.currentTimeMillis()
-    FileFactory.getCarbonFile(timestampFile)
-      .setLastModifiedTime(systemTime)
-    systemTime
-  }
-
-  private def getTimestampFileAndType() = {
-    // if mdt file path is configured then take configured path else take default path
-    val configuredMdtPath = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
-        CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER_DEFAULT).trim
-    val timestampFile = configuredMdtPath + "/" + CarbonCommonConstants.SCHEMAS_MODIFIED_TIME_FILE
-    CarbonUtil.checkAndAppendFileSystemURIScheme(timestampFile)
-  }
-
   def updateTableStatus(
     validSegments: List[String],
     databaseName: String,
@@ -114,17 +86,4 @@ object FileInternalUtil {
     )
     status
   }
-
-
-
-  def touchSchemaFileTimestamp(dbName: String,
-      tableName: String,
-      tablePath: String,
-      schemaTimeStamp: Long): Unit = {
-    val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
-    if (FileFactory.isFileExist(tableMetadataFile)) {
-      FileFactory.getCarbonFile(tableMetadataFile)
-        .setLastModifiedTime(schemaTimeStamp)
-    }
-  }
 }