You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/06 16:21:37 UTC

carbondata git commit: [CARBONDATA-1737] [CARBONDATA-1760] [PreAgg] Fixed partial load issue if user has set segments to access on parent table

Repository: carbondata
Updated Branches:
  refs/heads/master 0bbfa8597 -> 0e8707a60


[CARBONDATA-1737] [CARBONDATA-1760] [PreAgg] Fixed partial load issue if user has set segments to access on parent table

Analysis: Partial load was happening on pre-aggregate table when the user has set segments to access for the parent table.

Solution: Set segments to access to * before firing load for child table.

This closes #1613


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e8707a6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e8707a6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e8707a6

Branch: refs/heads/master
Commit: 0e8707a60cdef2604dc78c61e53bd349a7e90d28
Parents: 0bbfa85
Author: kunal642 <ku...@gmail.com>
Authored: Tue Dec 5 17:01:46 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Dec 6 21:51:23 2017 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggregateLoad.scala     | 17 +++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 22 ++++++---
 .../CreatePreAggregateTableCommand.scala        | 40 +++++++--------
 .../preaaggregate/PreAggregateListeners.scala   | 41 +++------------
 .../preaaggregate/PreAggregateUtil.scala        | 52 +++++++++++++++++++-
 5 files changed, 106 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 1502c53..569439c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -170,4 +170,21 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
     }.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table"))
   }
 
+  test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
+    sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
+    sql("set carbon.input.segments.default.maintable=0")
+    sql(
+      s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id"""
+        .stripMargin)
+    sql("reset")
+    checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 67d75bd..1fa838c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -359,14 +359,20 @@ class CarbonScanRDD(
     }
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
-      CarbonTableInputFormat.setAggeragateTableSegments(conf, carbonSessionInfo.getSessionParams
-        .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-                     identifier.getCarbonTableIdentifier.getDatabaseName + "." +
-                     identifier.getCarbonTableIdentifier.getTableName, ""))
-      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
-          .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-                       identifier.getCarbonTableIdentifier.getDatabaseName + "." +
-                       identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+      val segmentsToScan = carbonSessionInfo.getSessionParams.getProperty(
+        CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+        identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+        identifier.getCarbonTableIdentifier.getTableName)
+      if (segmentsToScan != null) {
+        CarbonTableInputFormat.setAggeragateTableSegments(conf, segmentsToScan)
+      }
+      val validateSegments = carbonSessionInfo.getSessionParams.getProperty(
+        CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+        identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+        identifier.getCarbonTableIdentifier.getTableName)
+      if (validateSegments != null) {
+        CarbonTableInputFormat.setValidateSegmentsToAccess(conf, validateSegments.toBoolean)
+      }
     }
     format
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 6cee0e8..1ebf511 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -23,13 +23,13 @@ import scala.collection.mutable
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
-import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -69,7 +69,8 @@ case class CreatePreAggregateTableCommand(
       None)
 
     val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
-    assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table))
+    assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
+      "Parent table name is different in select and create")
     // updating the relation identifier, this will be stored in child table
     // which can be used during dropping of pre-aggreate table as parent table will
     // also get updated
@@ -102,14 +103,11 @@ case class CreatePreAggregateTableCommand(
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
     // drop child table and undo the change in table info of main table
-    CarbonDropTableCommand(
+    CarbonDropDataMapCommand(
+      dataMapName,
       ifExistsSet = true,
-      tableIdentifier.database,
-      tableIdentifier.table
-    ).run(sparkSession)
-
-    // TODO: undo the change in table info of main table
-
+      parentTableIdentifier.database,
+      parentTableIdentifier.table).run(sparkSession)
     Seq.empty
   }
 
@@ -124,19 +122,15 @@ case class CreatePreAggregateTableCommand(
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath)
       .nonEmpty
     if (loadAvailable) {
-      val headers = parentCarbonTable.getTableInfo.getFactTable.getListOfColumns.
-        asScala.map(_.getColumnName).mkString(",")
-      val childDataFrame = sparkSession.sql(
-        new CarbonSpark2SqlParser().addPreAggLoadFunction(queryString))
-      CarbonLoadDataCommand(tableIdentifier.database,
-        tableIdentifier.table,
-        null,
-        Nil,
-        Map("fileheader" -> headers),
-        isOverwriteTable = false,
-        dataFrame = Some(childDataFrame),
-        internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
-        run(sparkSession)
+      // Passing segmentToLoad as * because we want to load all the segments into the
+      // pre-aggregate table even if the user has set some segments on the parent table.
+      PreAggregateUtil.startDataLoadForDataMap(
+          parentCarbonTable,
+          tableIdentifier,
+          queryString,
+          segmentToLoad = "*",
+          validateSegments = true,
+          sparkSession = sparkSession)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index d314488..90b728d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.CarbonSession
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -40,41 +41,15 @@ object LoadPostAggregateListener extends OperationEventListener {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     if (table.hasDataMapSchema) {
       for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
-        CarbonSession.threadSet(
-          CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-          carbonLoadModel.getDatabaseName + "." +
-          carbonLoadModel.getTableName,
-          carbonLoadModel.getSegmentId)
-        CarbonSession.threadSet(
-          CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-          carbonLoadModel.getDatabaseName + "." +
-          carbonLoadModel.getTableName, "false")
         val childTableName = dataMapSchema.getRelationIdentifier.getTableName
         val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
-        val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
-          s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad")
-        val headers = dataMapSchema.getChildSchema.getListOfColumns.
-          asScala.map(_.getColumnName).mkString(",")
-        try {
-          CarbonLoadDataCommand(Some(childDatabaseName),
-            childTableName,
-            null,
-            Nil,
-            Map("fileheader" -> headers),
-            isOverwriteTable = false,
-            dataFrame = Some(childDataFrame),
-            internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
-            run(sparkSession)
-        } finally {
-          CarbonSession.threadUnset(
-            CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-            carbonLoadModel.getDatabaseName + "." +
-            carbonLoadModel.getTableName)
-          CarbonSession.threadUnset(
-            CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-            carbonLoadModel.getDatabaseName + "." +
-            carbonLoadModel.getTableName)
-        }
+        PreAggregateUtil.startDataLoadForDataMap(
+            table,
+            TableIdentifier(childTableName, Some(childDatabaseName)),
+            dataMapSchema.getProperties.get("CHILD_SELECT QUERY"),
+            carbonLoadModel.getSegmentId,
+            validateSegments = false,
+            sparkSession)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 43dc39e..95a711e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.preaaggregate
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, DataFrame, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
@@ -31,12 +31,14 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -493,4 +495,50 @@ object PreAggregateUtil {
     updatedPlan
   }
 
+  /**
+   * This method will start load process on the data map
+   */
+  def startDataLoadForDataMap(
+      parentCarbonTable: CarbonTable,
+      dataMapIdentifier: TableIdentifier,
+      queryString: String,
+      segmentToLoad: String,
+      validateSegments: Boolean,
+      sparkSession: SparkSession): Unit = {
+    CarbonSession.threadSet(
+      CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+      parentCarbonTable.getDatabaseName + "." +
+      parentCarbonTable.getTableName,
+      segmentToLoad)
+    CarbonSession.threadSet(
+      CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+      parentCarbonTable.getDatabaseName + "." +
+      parentCarbonTable.getTableName, validateSegments.toString)
+    val headers = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.
+      find(_.getChildSchema.getTableName.equals(dataMapIdentifier.table)).get.getChildSchema.
+      getListOfColumns.asScala.map(_.getColumnName).mkString(",")
+    val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+      queryString)).drop("preAggLoad")
+    try {
+      CarbonLoadDataCommand(dataMapIdentifier.database,
+        dataMapIdentifier.table,
+        null,
+        Nil,
+        Map("fileheader" -> headers),
+        isOverwriteTable = false,
+        dataFrame = Some(dataFrame),
+        internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
+        run(sparkSession)
+    } finally {
+      CarbonSession.threadUnset(
+        CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+        parentCarbonTable.getDatabaseName + "." +
+        parentCarbonTable.getTableName)
+      CarbonSession.threadUnset(
+        CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+        parentCarbonTable.getDatabaseName + "." +
+        parentCarbonTable.getTableName)
+    }
+  }
+
 }