You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2022/06/27 10:52:12 UTC

[carbondata] branch master updated: [CARBONDATA-4344] Create MV fails with "LOCAL_DICTIONARY_INCLUDE/LOCAL _DICTIONARY_EXCLUDE column: does not exist in table. Please check the DDL" error

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 858afc7eb6 [CARBONDATA-4344] Create MV fails with "LOCAL_DICTIONARY_INCLUDE/LOCAL _DICTIONARY_EXCLUDE column: does not exist in table. Please check the DDL" error
858afc7eb6 is described below

commit 858afc7eb60508de1f9d5fc8df06099e83df3c15
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Wed Jun 22 17:11:23 2022 +0530

    [CARBONDATA-4344] Create MV fails with "LOCAL_DICTIONARY_INCLUDE/LOCAL _DICTIONARY_EXCLUDE column: does not exist in table. Please check the DDL" error
    
    Why is this PR needed?
    Create MV fails with "LOCAL_DICTIONARY_INCLUDE/LOCAL _DICTIONARY_EXCLUDE column: does not exist in table.
    Please check the DDL" error.
    Error occurs only in this scenario: Create Table --> Load --> Alter Add Columns --> Drop table --> Refresh Table --> Create MV
    and not in direct scenario like: Create Table --> Load --> Alter Add Columns --> Create MV
    
    What changes were proposed in this PR?
    1. After add column command, LOCAL_DICTIONARY_INCLUDE and LOCAL_DICTIONARY_EXCLUDE properties
       are added to the table even if the columns are empty. So, when MV is created next as
       LOCAL_DICTIONARY_EXCLUDE column is defined it tries to access its columns and fails.
       --> Added empty check before adding properties to the table to resolve this.
    2. In a direct scenario after add column, the schema gets updated in catalog table but
       the table properties are not updated. Made changes to update table properties to catalog table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4282
---
 .../TestRegisterIndexCarbonTable.scala             | 30 ++----------------
 .../command/carbonTableSchemaCommon.scala          | 12 +++++---
 .../apache/spark/sql/hive/CarbonSessionUtil.scala  |  4 +++
 .../org/apache/spark/sql/test/util/QueryTest.scala | 27 ++++++++++++++++
 .../StandardPartitionTableLoadingTestCase.scala    | 27 +---------------
 .../carbondata/view/rewrite/MVCreateTestCase.scala | 36 ++++++++++++++++++++++
 .../register/TestRegisterCarbonTable.scala         | 25 ---------------
 7 files changed, 78 insertions(+), 83 deletions(-)

diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
index 0b4aa5ba87..c38554f73c 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestRegisterIndexCarbonTable.scala
@@ -16,9 +16,6 @@
  */
 package org.apache.carbondata.spark.testsuite.secondaryindex
 
-import java.io.{File, IOException}
-
-import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
@@ -35,29 +32,6 @@ class TestRegisterIndexCarbonTable extends QueryTest with BeforeAndAfterAll {
     sql("drop database if exists carbon cascade")
   }
 
-  private def restoreData(dblocation: String, tableName: String) = {
-    val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    try {
-      FileUtils.copyDirectory(new File(source), new File(destination))
-      FileUtils.deleteDirectory(new File(source))
-    } catch {
-      case e : Exception =>
-        throw new IOException("carbon table data restore failed.")
-    } finally {
-
-    }
-  }
-  private def backUpData(dblocation: String, tableName: String) = {
-    val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    try {
-      FileUtils.copyDirectory(new File(source), new File(destination))
-    } catch {
-      case e : Exception =>
-        throw new IOException("carbon table data backup failed.")
-    }
-  }
   test("register tables test") {
     val location = TestQueryExecutor.warehouse +
                            CarbonCommonConstants.FILE_SEPARATOR + "dbName"
@@ -68,8 +42,8 @@ class TestRegisterIndexCarbonTable extends QueryTest with BeforeAndAfterAll {
         "c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")
     sql("insert into carbontable select 'a',1,'aa','aaa'")
     sql("create index index_on_c3 on table carbontable (c3, c5) AS 'carbondata'")
-    backUpData(location, "carbontable")
-    backUpData(location, "index_on_c3")
+    backUpData(location, None, "carbontable")
+    backUpData(location, None, "index_on_c3")
     sql("drop table carbontable")
     restoreData(location, "carbontable")
     restoreData(location, "index_on_c3")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index dff6ac1727..6e37fb8402 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -514,10 +514,14 @@ class AlterTableColumnSchemaGenerator(
 
     // The Final Map should contain the combined Local Dictionary Include and
     // Local Dictionary Exclude Columns from both Main table and Alter table
-    tablePropertiesMap
-      .put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE, localDictionaryIncludeColumn.toString())
-    tablePropertiesMap
-      .put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE, localDictionaryExcludeColumn.toString())
+    if (localDictionaryIncludeColumn.toString().nonEmpty) {
+      tablePropertiesMap.put(CarbonCommonConstants.LOCAL_DICTIONARY_INCLUDE,
+        localDictionaryIncludeColumn.toString())
+    }
+    if (localDictionaryExcludeColumn.toString().nonEmpty) {
+      tablePropertiesMap.put(CarbonCommonConstants.LOCAL_DICTIONARY_EXCLUDE,
+        localDictionaryExcludeColumn.toString())
+    }
     // This part will create dictionary file for all newly added dictionary columns
     // if valid default value is provided,
     // then that value will be included while creating dictionary file
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
index 9899617a1d..4a91b2b99a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonSessionUtil.scala
@@ -173,6 +173,10 @@ object CarbonSessionUtil {
       .alterTableDataSchema(tableIdentifier.database.get,
         tableIdentifier.table,
         StructType(colArray))
+    // Updates the table properties in catalog table.
+    CarbonSessionCatalogUtil.alterTableProperties(
+      sparkSession, tableIdentifier,
+      carbonTable.getTableInfo.getFactTable.getTableProperties.asScala.toMap, Seq.empty)
   }
 
   def updateCachedPlan(plan: LogicalPlan): LogicalPlan = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
index 771ab3ed7e..7c0f5b3c0b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/test/util/QueryTest.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.test.util
 
+import java.io.{File, IOException}
 import java.util.{Locale, TimeZone}
 
 import scala.collection.JavaConverters._
 
+import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.{CarbonEnv, CarbonToSparkAdapter, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
@@ -221,6 +223,31 @@ class QueryTest extends PlanTest {
       }
     }
   }
+
+  def restoreData(dblocation: String, tableName: String): Unit = {
+    val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      FileUtils.copyDirectory(new File(source), new File(destination))
+      FileUtils.deleteDirectory(new File(source))
+    } catch {
+      case e: Exception =>
+        throw new IOException("carbon table data restore failed.")
+    } finally {
+
+    }
+  }
+
+  def backUpData(dblocation: String, database: Option[String], tableName: String): Unit = {
+    val source = CarbonEnv.getTablePath(database, tableName)(sqlContext.sparkSession)
+    val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      FileUtils.copyDirectory(new File(source), new File(destination))
+    } catch {
+      case e: Exception =>
+        throw new IOException("carbon table data backup failed.")
+    }
+  }
 }
 
 object QueryTest {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 8dfc225dea..e890890922 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -405,7 +405,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     val partitions = sql("show partitions restorepartition").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default_restorepartition")
     val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/"))
-    backUpData(dblocation, "restorepartition")
+    backUpData(dblocation, None, "restorepartition")
     sql("drop table restorepartition")
     if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore) {
       restoreData(dblocation, "restorepartition")
@@ -741,31 +741,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     (Strings.formatSize(dataSize.toFloat), Strings.formatSize(indexSize.toFloat))
   }
 
-  private def restoreData(dblocation: String, tableName: String) = {
-    val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    try {
-      FileUtils.copyDirectory(new File(source), new File(destination))
-      FileUtils.deleteDirectory(new File(source))
-    } catch {
-      case e : Exception =>
-        throw new IOException("carbon table data restore failed.")
-    } finally {
-
-    }
-  }
-
-  private def backUpData(dblocation: String, tableName: String) = {
-    val source = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    try {
-      FileUtils.copyDirectory(new File(source), new File(destination))
-    } catch {
-      case e : Exception =>
-        throw new IOException("carbon table data backup failed.", e)
-    }
-  }
-
 
   override def afterAll: Unit = {
     CarbonProperties.getInstance().addProperty("carbon.read.partition.hive.direct",
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
index fde0ae82d0..12b9a5a26c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.common.exceptions.sql.MalformedMVCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
@@ -1556,6 +1557,41 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "decimal_mv"))
   }
 
+  test("test create MV after alter add column, drop table and refresh") {
+    sql("drop table if exists source1")
+    sql("CREATE table source1 (empno int, empname String, " +
+        "designation String, doj Timestamp, workgroupcategory int, " +
+        "workgroupcategoryname String, deptno int, deptname String, projectcode int, " +
+        "projectjoindate Timestamp, projectenddate Timestamp, attendance int, " +
+        "utilization int,salary int) STORED AS CARBONDATA")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/data.csv' INTO " +
+        "TABLE source1 OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', " +
+        "'BAD_RECORDS_LOGGER_ENABLE'='FALSE', 'BAD_RECORDS_ACTION'='FORCE')")
+    sql("alter table source1 add columns(a int, b string) tblproperties " +
+        "('LONG_STRING_COLUMNS'='b')")
+    val table = CarbonMetadata.getInstance().getCarbonTable("default_source1")
+    val dblocation = table.getTablePath.substring(0, table.getTablePath.lastIndexOf("/"))
+    sql("drop MATERIALIZED VIEW if exists uniq2_mv")
+    sql("create MATERIALIZED VIEW uniq2_mv as " +
+        "select b, sum(empno) from source1 group by b")
+    val desc = sql("describe formatted uniq2_mv").collect
+    desc.find(_.get(0).toString.contains("LONG_STRING_COLUMNS")) match {
+      case Some(row) => assert(row.get(1).toString.contains("source1_b"))
+      case None => assert(false)
+    }
+    backUpData(dblocation, None, "source1")
+    sql("drop table source1")
+    if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetaStore.isReadFromHiveMetaStore) {
+      restoreData(dblocation, "source1")
+      sql("refresh table source1")
+    }
+    sql("drop MATERIALIZED VIEW if exists uniq2_mv")
+    sql("create MATERIALIZED VIEW uniq2_mv as " +
+        "select b, sum(empno) from source1 group by b")
+    sql("drop MATERIALIZED VIEW if exists uniq2_mv")
+    sql("drop table if exists source1")
+  }
+
   def copy(oldLoc: String, newLoc: String): Unit = {
     val oldFolder = FileFactory.getCarbonFile(oldLoc)
     FileFactory.mkdirs(newLoc, FileFactory.getConfiguration)
diff --git a/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala b/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
index 315dc048ab..f254c44085 100644
--- a/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
+++ b/integration/spark/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
@@ -38,31 +38,6 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterEach {
     sql("set carbon.enable.mv = true")
   }
 
-  private def restoreData(dblocation: String, tableName: String) = {
-    val destination = dblocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    val source = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    try {
-      FileUtils.copyDirectory(new File(source), new File(destination))
-      FileUtils.deleteDirectory(new File(source))
-    } catch {
-      case e : Exception =>
-        throw new IOException("carbon table data restore failed.")
-    } finally {
-
-    }
-  }
-
-  private def backUpData(dblocation: String, database: Option[String], tableName: String) = {
-    val source = CarbonEnv.getTablePath(database, tableName)(sqlContext.sparkSession)
-    val destination = dblocation + "_back" + CarbonCommonConstants.FILE_SEPARATOR + tableName
-    try {
-      FileUtils.copyDirectory(new File(source), new File(destination))
-    } catch {
-      case e : Exception =>
-        throw new IOException("carbon table data backup failed.")
-    }
-  }
-
   test("register tables test") {
     sql(s"create database carbon location '$dbLocation'")
     sql("use carbon")