You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/06 16:21:37 UTC
carbondata git commit: [CARBONDATA-1737] [CARBONDATA-1760] [PreAgg]
Fixed partial load issue if user has set segments to access on parent table
Repository: carbondata
Updated Branches:
refs/heads/master 0bbfa8597 -> 0e8707a60
[CARBONDATA-1737] [CARBONDATA-1760] [PreAgg] Fixed partial load issue if user has set segments to access on parent table
Analysis: Partial load was happening on pre-aggregate table when the user has set segments to access for the parent table.
Solution: Set segments to access to * before firing load for child table.
This closes #1613
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0e8707a6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0e8707a6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0e8707a6
Branch: refs/heads/master
Commit: 0e8707a60cdef2604dc78c61e53bd349a7e90d28
Parents: 0bbfa85
Author: kunal642 <ku...@gmail.com>
Authored: Tue Dec 5 17:01:46 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Dec 6 21:51:23 2017 +0530
----------------------------------------------------------------------
.../preaggregate/TestPreAggregateLoad.scala | 17 +++++++
.../carbondata/spark/rdd/CarbonScanRDD.scala | 22 ++++++---
.../CreatePreAggregateTableCommand.scala | 40 +++++++--------
.../preaaggregate/PreAggregateListeners.scala | 41 +++------------
.../preaaggregate/PreAggregateUtil.scala | 52 +++++++++++++++++++-
5 files changed, 106 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 1502c53..569439c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -170,4 +170,21 @@ class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
}.getMessage.equalsIgnoreCase("Cannot insert/load data directly into pre-aggregate table"))
}
+ test("test whether all segments are loaded into pre-aggregate table if segments are set on main table") {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
+ sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 26)")
+ sql("set carbon.input.segments.default.maintable=0")
+ sql(
+ s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id"""
+ .stripMargin)
+ sql("reset")
+ checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52))
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 67d75bd..1fa838c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -359,14 +359,20 @@ class CarbonScanRDD(
}
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
- CarbonTableInputFormat.setAggeragateTableSegments(conf, carbonSessionInfo.getSessionParams
- .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- identifier.getCarbonTableIdentifier.getDatabaseName + "." +
- identifier.getCarbonTableIdentifier.getTableName, ""))
- CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
- .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- identifier.getCarbonTableIdentifier.getDatabaseName + "." +
- identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
+ val segmentsToScan = carbonSessionInfo.getSessionParams.getProperty(
+ CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName)
+ if (segmentsToScan != null) {
+ CarbonTableInputFormat.setAggeragateTableSegments(conf, segmentsToScan)
+ }
+ val validateSegments = carbonSessionInfo.getSessionParams.getProperty(
+ CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ identifier.getCarbonTableIdentifier.getDatabaseName + "." +
+ identifier.getCarbonTableIdentifier.getTableName)
+ if (validateSegments != null) {
+ CarbonTableInputFormat.setValidateSegmentsToAccess(conf, validateSegments.toBoolean)
+ }
}
format
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 6cee0e8..1ebf511 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -23,13 +23,13 @@ import scala.collection.mutable
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
-import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.path.CarbonTablePath
/**
* Below command class will be used to create pre-aggregate table
@@ -69,7 +69,8 @@ case class CreatePreAggregateTableCommand(
None)
val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
- assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table))
+ assert(parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table),
+ "Parent table name is different in select and create")
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
@@ -102,14 +103,11 @@ case class CreatePreAggregateTableCommand(
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
// drop child table and undo the change in table info of main table
- CarbonDropTableCommand(
+ CarbonDropDataMapCommand(
+ dataMapName,
ifExistsSet = true,
- tableIdentifier.database,
- tableIdentifier.table
- ).run(sparkSession)
-
- // TODO: undo the change in table info of main table
-
+ parentTableIdentifier.database,
+ parentTableIdentifier.table).run(sparkSession)
Seq.empty
}
@@ -124,19 +122,15 @@ case class CreatePreAggregateTableCommand(
val loadAvailable = SegmentStatusManager.readLoadMetadata(parentCarbonTable.getMetaDataFilepath)
.nonEmpty
if (loadAvailable) {
- val headers = parentCarbonTable.getTableInfo.getFactTable.getListOfColumns.
- asScala.map(_.getColumnName).mkString(",")
- val childDataFrame = sparkSession.sql(
- new CarbonSpark2SqlParser().addPreAggLoadFunction(queryString))
- CarbonLoadDataCommand(tableIdentifier.database,
- tableIdentifier.table,
- null,
- Nil,
- Map("fileheader" -> headers),
- isOverwriteTable = false,
- dataFrame = Some(childDataFrame),
- internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
- run(sparkSession)
+ // Passing segmentToLoad as * because we want to load all the segments into the
+ // pre-aggregate table even if the user has set some segments on the parent table.
+ PreAggregateUtil.startDataLoadForDataMap(
+ parentCarbonTable,
+ tableIdentifier,
+ queryString,
+ segmentToLoad = "*",
+ validateSegments = true,
+ sparkSession = sparkSession)
}
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index d314488..90b728d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.CarbonSession
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -40,41 +41,15 @@ object LoadPostAggregateListener extends OperationEventListener {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (table.hasDataMapSchema) {
for (dataMapSchema: DataMapSchema <- table.getTableInfo.getDataMapSchemaList.asScala) {
- CarbonSession.threadSet(
- CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- carbonLoadModel.getDatabaseName + "." +
- carbonLoadModel.getTableName,
- carbonLoadModel.getSegmentId)
- CarbonSession.threadSet(
- CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- carbonLoadModel.getDatabaseName + "." +
- carbonLoadModel.getTableName, "false")
val childTableName = dataMapSchema.getRelationIdentifier.getTableName
val childDatabaseName = dataMapSchema.getRelationIdentifier.getDatabaseName
- val childDataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
- s"${ dataMapSchema.getProperties.get("CHILD_SELECT QUERY") } ")).drop("preAggLoad")
- val headers = dataMapSchema.getChildSchema.getListOfColumns.
- asScala.map(_.getColumnName).mkString(",")
- try {
- CarbonLoadDataCommand(Some(childDatabaseName),
- childTableName,
- null,
- Nil,
- Map("fileheader" -> headers),
- isOverwriteTable = false,
- dataFrame = Some(childDataFrame),
- internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
- run(sparkSession)
- } finally {
- CarbonSession.threadUnset(
- CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
- carbonLoadModel.getDatabaseName + "." +
- carbonLoadModel.getTableName)
- CarbonSession.threadUnset(
- CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
- carbonLoadModel.getDatabaseName + "." +
- carbonLoadModel.getTableName)
- }
+ PreAggregateUtil.startDataLoadForDataMap(
+ table,
+ TableIdentifier(childTableName, Some(childDatabaseName)),
+ dataMapSchema.getProperties.get("CHILD_SELECT QUERY"),
+ carbonLoadModel.getSegmentId,
+ validateSegments = false,
+ sparkSession)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0e8707a6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 43dc39e..95a711e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, DataFrame, SparkSession}
import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
@@ -31,12 +31,14 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -493,4 +495,50 @@ object PreAggregateUtil {
updatedPlan
}
+ /**
+ * This method will start load process on the data map
+ */
+ def startDataLoadForDataMap(
+ parentCarbonTable: CarbonTable,
+ dataMapIdentifier: TableIdentifier,
+ queryString: String,
+ segmentToLoad: String,
+ validateSegments: Boolean,
+ sparkSession: SparkSession): Unit = {
+ CarbonSession.threadSet(
+ CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ parentCarbonTable.getDatabaseName + "." +
+ parentCarbonTable.getTableName,
+ segmentToLoad)
+ CarbonSession.threadSet(
+ CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ parentCarbonTable.getDatabaseName + "." +
+ parentCarbonTable.getTableName, validateSegments.toString)
+ val headers = parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.
+ find(_.getChildSchema.getTableName.equals(dataMapIdentifier.table)).get.getChildSchema.
+ getListOfColumns.asScala.map(_.getColumnName).mkString(",")
+ val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+ queryString)).drop("preAggLoad")
+ try {
+ CarbonLoadDataCommand(dataMapIdentifier.database,
+ dataMapIdentifier.table,
+ null,
+ Nil,
+ Map("fileheader" -> headers),
+ isOverwriteTable = false,
+ dataFrame = Some(dataFrame),
+ internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true")).
+ run(sparkSession)
+ } finally {
+ CarbonSession.threadUnset(
+ CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ parentCarbonTable.getDatabaseName + "." +
+ parentCarbonTable.getTableName)
+ CarbonSession.threadUnset(
+ CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ parentCarbonTable.getDatabaseName + "." +
+ parentCarbonTable.getTableName)
+ }
+ }
+
}