You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2017/12/13 14:02:25 UTC

carbondata git commit: [CARBONDATA-1891] Fixed timeseries table creation after data load

Repository: carbondata
Updated Branches:
  refs/heads/master 6a185b834 -> 27b36e4e8


[CARBONDATA-1891] Fixed timeseries table creation after data load

CreatePreAggregateTableCommand.run() was being called from CarbonCreateDataMapCommand.processMetaData() Due to this wrong table was being created which is not in DataMapSchemaList.Refactor code to call CreatePreAggregateTableCommand.processMetadata instead of run

This closes #1651


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/27b36e4e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/27b36e4e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/27b36e4e

Branch: refs/heads/master
Commit: 27b36e4e8949540c22b0b24db35128bf8607061d
Parents: 6a185b8
Author: kunal642 <ku...@gmail.com>
Authored: Wed Dec 13 14:55:09 2017 +0530
Committer: kumarvishal <ku...@gmail.com>
Committed: Wed Dec 13 19:32:09 2017 +0530

----------------------------------------------------------------------
 .../timeseries/TestTimeseriesDataLoad.scala     | 14 ++++
 .../datamap/CarbonCreateDataMapCommand.scala    | 70 +++++++++++++++-----
 .../preaaggregate/PreAggregateUtil.scala        | 13 +++-
 3 files changed, 77 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/27b36e4e/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index 217edea..6a0ea62 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -73,6 +73,20 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
         Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
   }
 
+  test("test if timeseries load is successful on table creation") {
+    sql("drop table if exists mainTable")
+    sql("CREATE TABLE mainTable(mytime timestamp, name string, age int) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+    sql("create datamap agg0 on table mainTable using 'preaggregate' DMPROPERTIES ('timeseries.eventTime'='mytime', 'timeseries.hierarchy'='second=1,minute=1,hour=1,day=1,month=1,year=1') as select mytime, sum(age) from mainTable group by mytime")
+    checkAnswer( sql("select * from maintable_agg0_second"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"),10),
+        Row(Timestamp.valueOf("2016-02-23 01:01:40.0"),20),
+        Row(Timestamp.valueOf("2016-02-23 01:01:50.0"),30),
+        Row(Timestamp.valueOf("2016-02-23 01:02:30.0"),40),
+        Row(Timestamp.valueOf("2016-02-23 01:02:40.0"),50),
+        Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
+  }
+
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/27b36e4e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index a3aa36d..574c31a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -64,7 +64,7 @@ case class CarbonCreateDataMapCommand(
             dmClassName,
             updatedDmProperties,
             queryString.get,
-            Some(f._1)).run(sparkSession)
+            Some(f._1)).processMetadata(sparkSession)
         }
       }
       else {
@@ -90,13 +90,32 @@ case class CarbonCreateDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {
-      CreatePreAggregateTableCommand(
-        dataMapName,
-        tableIdentifier,
-        dmClassName,
-        dmproperties,
-        queryString.get
-      ).processData(sparkSession)
+      val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
+      if (timeHierarchyString.isDefined) {
+        val details = TimeSeriesUtil
+          .validateAndGetTimeSeriesHierarchyDetails(
+            timeHierarchyString.get)
+        val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY
+        details.foreach { f =>
+          CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
+            tableIdentifier,
+            dmClassName,
+            updatedDmProperties,
+            queryString.get,
+            Some(f._1)).processData(sparkSession)
+        }
+        Seq.empty
+      }
+      else {
+        CreatePreAggregateTableCommand(
+          dataMapName,
+          tableIdentifier,
+          dmClassName,
+          dmproperties,
+          queryString.get
+        ).processData(sparkSession)
+        Seq.empty
+      }
     } else {
       Seq.empty
     }
@@ -105,18 +124,35 @@ case class CarbonCreateDataMapCommand(
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
     if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
         dmClassName.equalsIgnoreCase("preaggregate")) {
-      CreatePreAggregateTableCommand(
-        dataMapName,
-        tableIdentifier,
-        dmClassName,
-        dmproperties,
-        queryString.get
-      ).undoMetadata(sparkSession, exception)
+      val timeHierarchyString = dmproperties.get(CarbonCommonConstants.TIMESERIES_HIERARCHY)
+      if (timeHierarchyString.isDefined) {
+        val details = TimeSeriesUtil
+          .validateAndGetTimeSeriesHierarchyDetails(
+            timeHierarchyString.get)
+        val updatedDmProperties = dmproperties - CarbonCommonConstants.TIMESERIES_HIERARCHY
+        details.foreach { f =>
+          CreatePreAggregateTableCommand(dataMapName + '_' + f._1,
+            tableIdentifier,
+            dmClassName,
+            updatedDmProperties,
+            queryString.get,
+            Some(f._1)).undoMetadata(sparkSession, exception)
+        }
+        Seq.empty
+      }
+      else {
+        CreatePreAggregateTableCommand(
+          dataMapName,
+          tableIdentifier,
+          dmClassName,
+          dmproperties,
+          queryString.get
+        ).undoMetadata(sparkSession, exception)
+        Seq.empty
+      }
     } else {
       Seq.empty
     }
   }
-
 }
 
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/27b36e4e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 81ccbd2..6b24f46 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -515,9 +515,16 @@ object PreAggregateUtil {
       CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
       parentCarbonTable.getDatabaseName + "." +
       parentCarbonTable.getTableName, validateSegments.toString)
-    val headers = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.
-      find(_.getChildSchema.getTableName.equals(dataMapIdentifier.table)).get.getChildSchema.
-      getListOfColumns.asScala.map(_.getColumnName).mkString(",")
+    val dataMapSchemas = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala
+    val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase(
+      dataMapIdentifier.table)) match {
+      case Some(dataMapSchema) =>
+        dataMapSchema.getChildSchema.getListOfColumns.asScala.map(_.getColumnName).mkString(",")
+      case None =>
+        throw new RuntimeException(
+          s"${ dataMapIdentifier.table} datamap not found in DataMapSchema list: ${
+          dataMapSchemas.map(_.getChildSchema.getTableName).mkString("[", ",", "]")}")
+    }
     val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
       queryString)).drop("preAggLoad")
     try {