You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2019/12/13 03:43:49 UTC

[carbondata] branch master updated: [CARBONDATA-3600] Fix creating mv timeseries UDF column as partition column

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 75cab45  [CARBONDATA-3600] Fix creating mv timeseries UDF column as partition column
75cab45 is described below

commit 75cab4560c109de369ad3f830b5fed8b77e27182
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Sun Dec 1 16:21:04 2019 +0530

    [CARBONDATA-3600] Fix creating mv timeseries UDF column as partition column
    
    Problem:
    Issue 1:
    When trying to create datamap with partition column in timeseries udf,
    throws Exception.
    Issue 2:
    When Create datamap was in progress, Jdbc application is killed. When
    restarting, datamap table not found exception is thrown.
    
    Solution:
    Issue 1:
    While adding fields to FieldRelationMap, in case of group by, no need to
    add UDF expressions.
    If partition column is present in timeseries UDF, child table will not
    inherit partition fields from parent table
    Issue 2:
    Clean up the invalid table and schema from the store
    
    This closes #3493
---
 .../carbondata/mv/datamap/MVDataMapProvider.scala  | 13 +++++--
 .../apache/carbondata/mv/datamap/MVHelper.scala    | 10 +++---
 .../org/apache/carbondata/mv/datamap/MVUtil.scala  | 42 +++++++++++-----------
 .../mv/rewrite/TestPartitionWithMV.scala           | 18 ++++++++++
 .../timeseries/TestMVTimeSeriesLoadAndQuery.scala  | 12 +++++++
 .../scala/org/apache/spark/sql/CarbonEnv.scala     | 36 +++++++++++++++++++
 6 files changed, 103 insertions(+), 28 deletions(-)

diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
index 15508fd..6499a8f 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -95,8 +95,17 @@ class MVDataMapProvider(
       dataMapSchema.getRelationIdentifier.getTableName,
       true)
     dropTableCommand.processMetadata(sparkSession)
-    DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
-    DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+    // First, drop datamapschema and unregister datamap from catalog, because if in
+    // case, unregister fails, datamapschema will not be deleted from system and cannot
+    // create datamap also again
+    try {
+      DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+    } catch {
+      case e: IOException =>
+        throw e
+    } finally {
+      DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
+    }
   }
 
   override def cleanData(): Unit = {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
index 7ff4c8a..29afec1 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -305,10 +305,10 @@ object MVHelper {
   }
 
   private def isValidSelect(isValidExp: Boolean,
-      s: Select): Boolean = {
+      filterPredicate: Seq[Expression], outputList: Seq[NamedExpression]): Boolean = {
     // Make sure all predicates are present in projections.
     var predicateList: Seq[AttributeReference] = Seq.empty
-    s.predicateList.map { f =>
+    filterPredicate.map { f =>
       f.children.collect {
         case p: AttributeReference =>
           predicateList = predicateList.+:(p)
@@ -321,7 +321,7 @@ object MVHelper {
     }
     if (predicateList.nonEmpty) {
       predicateList.forall { p =>
-        s.outputList.exists {
+        outputList.exists {
           case a: Alias =>
             a.semanticEquals(p) || a.child.semanticEquals(p) || a.collect {
               case attr: AttributeReference =>
@@ -364,11 +364,11 @@ object MVHelper {
         }
         g.child match {
           case s: Select =>
-            isValidSelect(isValidExp, s)
+            isValidSelect(isValidExp, s.predicateList, g.outputList)
           case m: ModularRelation => isValidExp
         }
       case s: Select =>
-        isValidSelect(true, s)
+        isValidSelect(true, s.predicateList, s.outputList)
       case _ => true
     }
     if (!isValid) {
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
index 7e6ee02..74b0474 100644
--- a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVUtil.scala
@@ -204,27 +204,27 @@ class MVUtil {
           "")
       }
     }
-    groupByExp map { grp =>
-      grp.collect {
-        case attr: AttributeReference =>
-          val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
-          if (null != carbonTable) {
-            val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
-                ArrayBuffer[ColumnTableRelation]()
-            arrayBuffer += getColumnRelation(attr.name,
-              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-              carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-              carbonTable)
-            fieldToDataMapFieldMap +=
-            getFieldToDataMapFields(attr.name,
-              attr.dataType,
-              attr.qualifier,
-              "",
-              arrayBuffer,
-              carbonTable.getTableName)
-          }
-      }
+    groupByExp map {
+      case attr: AttributeReference =>
+        val carbonTable: CarbonTable = getCarbonTable(logicalRelation, attr)
+        if (null != carbonTable) {
+          val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new
+              ArrayBuffer[ColumnTableRelation]()
+          arrayBuffer += getColumnRelation(attr.name,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+            carbonTable)
+          fieldToDataMapFieldMap +=
+          getFieldToDataMapFields(attr.name,
+            attr.dataType,
+            attr.qualifier,
+            "",
+            arrayBuffer,
+            carbonTable.getTableName)
+        }
+        attr
+      case _ =>
     }
     fieldToDataMapFieldMap
   }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
index 3cb0227..0d5b645 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestPartitionWithMV.scala
@@ -685,4 +685,22 @@ class TestPartitionWithMV extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists partitionone")
   }
 
+  test("test partition on timeseries column") {
+    sql("drop table if exists partitionone")
+    sql("create table partitionone(a int,b int) partitioned by (c timestamp,d timestamp) stored by 'carbondata'")
+    sql("insert into partitionone values(1,2,'2017-01-01 01:00:00','2018-01-01 01:00:00')")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table partitionone using 'mv' as select timeseries(c,'day'),sum(b) from partitionone group by timeseries(c,'day')")
+    assert(!CarbonEnv.getCarbonTable(Some("partition_mv"),"dm1_table")(sqlContext.sparkSession).isHivePartitionTable)
+    assert(sql("select timeseries(c,'day'),sum(b) from partitionone group by timeseries(c,'day')").count() == 1)
+    sql("drop table if exists partitionone")
+    sql("create table partitionone(a int,b timestamp) partitioned by (c timestamp) stored by 'carbondata'")
+    sql("insert into partitionone values(1,'2017-01-01 01:00:00','2018-01-01 01:00:00')")
+    sql("drop datamap if exists dm1")
+    sql("create datamap dm1 on table partitionone using 'mv' as select timeseries(b,'day'),c from partitionone group by timeseries(b,'day'),c")
+    assert(CarbonEnv.getCarbonTable(Some("partition_mv"),"dm1_table")(sqlContext.sparkSession).isHivePartitionTable)
+    assert(sql("select timeseries(b,'day'),c from partitionone group by timeseries(b,'day'),c").count() == 1)
+    sql("drop table if exists partitionone")
+  }
+
 }
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
index 8815a94..3bd1e60 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/timeseries/TestMVTimeSeriesLoadAndQuery.scala
@@ -329,6 +329,18 @@ class TestMVTimeSeriesLoadAndQuery extends QueryTest with BeforeAndAfterAll {
     checkPlan("datamap1", df)
   }
 
+  test("test create datamap with group by & filter columns not present in projection") {
+    sql("drop datamap if exists dm ")
+    intercept[UnsupportedOperationException] {
+      sql("create datamap dm using 'mv' as select timeseries(projectjoindate,'day') from maintable where empname='chandler' group by timeseries(projectjoindate,'day'),empname")
+    }.getMessage.contains("Group by/Filter columns must be present in project columns")
+    sql("create datamap dm using 'mv' as select timeseries(projectjoindate,'day'),empname from maintable where empname='chandler' group by timeseries(projectjoindate,'day'),empname")
+    var df = sql("select timeseries(projectjoindate,'day'),empname from maintable where empname='chandler' group by timeseries(projectjoindate,'day'),empname")
+    checkPlan("dm", df)
+    df = sql("select timeseries(projectjoindate,'day') from maintable where empname='chandler' group by timeseries(projectjoindate,'day'),empname")
+    checkPlan("dm", df)
+    sql("drop datamap if exists dm ")
+  }
 
   override def afterAll(): Unit = {
     drop()
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 3616cf6..37db135 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql
 
+import java.io.IOException
 import java.util.concurrent.ConcurrentHashMap
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
@@ -121,11 +124,44 @@ class CarbonEnv {
         CarbonProperties.getInstance
           .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
         initialized = true
+        cleanChildTablesNotRegisteredInHive(sparkSession)
       }
     }
     Profiler.initialize(sparkSession.sparkContext)
     LOGGER.info("Initialize CarbonEnv completed...")
   }
+
+  private def cleanChildTablesNotRegisteredInHive(sparkSession: SparkSession): Unit = {
+    // If in case JDBC application is killed/stopped, when create datamap was in progress, datamap
+    // table was created and datampschema was saved to the system, but table was not registered to
+    // metastore. So, when we restart JDBC application, we need to clean up
+    // stale tables and datamapschema's.
+    val dataMapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas
+    dataMapSchemas.asScala.foreach {
+      dataMapSchema =>
+        if (null != dataMapSchema.getRelationIdentifier &&
+            !dataMapSchema.isIndexDataMap) {
+          if (!sparkSession.sessionState
+            .catalog
+            .tableExists(TableIdentifier(dataMapSchema.getRelationIdentifier.getTableName,
+              Some(dataMapSchema.getRelationIdentifier.getDatabaseName)))) {
+            try {
+              DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+            } catch {
+              case e: IOException =>
+                throw e
+            } finally {
+              DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
+              if (FileFactory.isFileExist(dataMapSchema.getRelationIdentifier.getTablePath)) {
+                CarbonUtil.deleteFoldersAndFilesSilent(FileFactory.getCarbonFile(dataMapSchema
+                  .getRelationIdentifier
+                  .getTablePath))
+              }
+            }
+          }
+        }
+    }
+  }
 }
 
 object CarbonEnv {