You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 14:57:45 UTC
[03/20] carbondata git commit: [CARBONDATA-2189] Add DataMapProvider
developer interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
new file mode 100644
index 0000000..4c0e637
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+
+/**
+ * Below helper class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * It will be either success or nothing happen in case of failure:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ */
+case class PreAggregateTableHelper(
+ var parentTable: CarbonTable,
+ dataMapName: String,
+ dataMapClassName: String,
+ dataMapProperties: java.util.Map[String, String],
+ queryString: String,
+ timeSeriesFunction: Option[String] = None,
+ ifNotExistsSet: Boolean = false) {
+
+ var loadCommand: CarbonLoadDataCommand = _
+
+ def initMeta(sparkSession: SparkSession): Seq[Row] = {
+ val dmProperties = dataMapProperties.asScala
+ val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+ val df = sparkSession.sql(updatedQuery)
+ val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+ df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
+ val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
+ if (!parentTable.getTableName.equalsIgnoreCase(selectTable.getTableName)) {
+ throw new MalformedDataMapCommandException(
+ "Parent table name is different in select and create")
+ }
+ var neworder = Seq[String]()
+ val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
+ parentOrder.foreach(parentcol =>
+ fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+ parentcol.equals(fieldRelationMap(col).
+ columnTableRelationList.get(0).parentColumnName))
+ .map(cols => neworder :+= cols.column)
+ )
+ tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
+ tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+ getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
+ .LOAD_SORT_SCOPE_DEFAULT))
+ tableProperties
+ .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+ val tableIdentifier =
+ TableIdentifier(parentTable.getTableName + "_" + dataMapName,
+ Some(parentTable.getDatabaseName))
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+ ifNotExistPresent = ifNotExistsSet,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ isAlterFlow = false,
+ None)
+
+ // updating the relation identifier, this will be stored in child table
+ // which can be used during dropping of pre-aggreate table as parent table will
+ // also get updated
+ if(timeSeriesFunction != null) {
+ TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties.toMap, parentTable)
+ TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
+ fieldRelationMap,
+ dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME))
+ TimeSeriesUtil.updateTimeColumnSelect(
+ fieldRelationMap,
+ dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME),
+ timeSeriesFunction.get)
+ }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ val tablePath = if (dmProperties.contains("path")) {
+ dmProperties("path")
+ } else {
+ CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+ }
+ CarbonCreateTableCommand(TableNewProcessor(tableModel),
+ tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
+
+ val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+ val tableInfo = table.getTableInfo
+
+ // child schema object will be saved on parent table schema
+ val childSchema = tableInfo.getFactTable.buildChildSchema(
+ dataMapName,
+ dataMapClassName,
+ tableInfo.getDatabaseName,
+ queryString,
+ "AGGREGATION")
+ dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+
+ // updating the parent table about child table
+ PreAggregateUtil.updateMainTable(parentTable, childSchema, sparkSession)
+
+ // After updating the parent carbon table with data map entry extract the latest table object
+ // to be used in further create process.
+ parentTable = CarbonEnv.getCarbonTable(Some(parentTable.getDatabaseName),
+ parentTable.getTableName)(sparkSession)
+
+ val updatedLoadQuery = if (timeSeriesFunction != null) {
+ PreAggregateUtil.createTimeSeriesSelectQueryFromMain(
+ childSchema.getChildSchema,
+ parentTable.getTableName,
+ parentTable.getDatabaseName)
+ } else {
+ queryString
+ }
+ val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+ updatedLoadQuery)).drop("preAggLoad")
+ loadCommand = PreAggregateUtil.createLoadCommandForChild(
+ childSchema.getChildSchema.getListOfColumns,
+ tableIdentifier,
+ dataFrame,
+ isOverwrite = false,
+ sparkSession = sparkSession)
+ loadCommand.processMetadata(sparkSession)
+ Seq.empty
+ }
+
+ def initData(sparkSession: SparkSession): Seq[Row] = {
+ // load child table if parent table has existing segments
+ // This will be used to check if the parent table has any segments or not. If not then no
+ // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
+ // table.
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
+ val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
+ if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
+ load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+ throw new UnsupportedOperationException(
+ "Cannot create pre-aggregate table when insert is in progress on main table")
+ } else if (loadAvailable.nonEmpty) {
+ // Passing segmentToLoad as * because we want to load all the segments into the
+ // pre-aggregate table even if the user has set some segments on the parent table.
+ loadCommand.dataFrame = Some(PreAggregateUtil
+ .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
+ PreAggregateUtil.startDataLoadForDataMap(
+ TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)),
+ segmentToLoad = "*",
+ validateSegments = true,
+ loadCommand,
+ isOverwrite = false,
+ sparkSession)
+ }
+ Seq.empty
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index dd42b50..b52d0e7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.DataType
@@ -407,11 +407,11 @@ object PreAggregateUtil {
* Below method will be used to update the main table about the pre aggregate table information
* in case of any exception it will throw error so pre aggregate table creation will fail
*
- * @param childSchema
- * @param sparkSession
+ * @return the existing TableInfo object before updating, it can be used to recover if any
+ * operation failed later
*/
def updateMainTable(carbonTable: CarbonTable,
- childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
+ childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
LockUsage.DROP_TABLE_LOCK)
@@ -424,7 +424,7 @@ object PreAggregateUtil {
locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
// get the latest carbon table and check for column existence
// read the latest schema file
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
@@ -436,12 +436,11 @@ object PreAggregateUtil {
throw new MetadataProcessException("DataMap name already exist")
}
wrapperTableInfo.getDataMapSchemaList.add(childSchema)
- val thriftTable = schemaConverter
- .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
- updateSchemaInfo(carbonTable,
- thriftTable)(sparkSession)
- LOGGER.info(s"Parent table updated is successful for table" +
- s" $dbName.${childSchema.getRelationIdentifier.toString}")
+ val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+ wrapperTableInfo, dbName, tableName)
+ updateSchemaInfo(carbonTable, thriftTable)(sparkSession)
+ LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName")
+ thriftTableInfo
} catch {
case e: Exception =>
LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
@@ -450,7 +449,6 @@ object PreAggregateUtil {
// release lock after command execution completion
releaseLocks(locks)
}
- Seq.empty
}
/**
@@ -526,7 +524,7 @@ object PreAggregateUtil {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
carbonTable.getTableLastUpdatedTime
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
metastore.revertTableSchemaForPreAggCreationFailure(
carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 064ba18..4962c3a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -63,7 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext)
// get the latest carbon table and check for column existence
// read the latest schema file
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter
.fromExternalToWrapperTableInfo(thriftTableInfo,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 4ef0df9..c8f7ac7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -74,7 +74,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
}
// read the latest schema file
val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ metastore.getThriftTableInfo(carbonTable)
// maintain the added column for schema evolution history
var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 7bbefd7..a64bdb9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -100,7 +100,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
// read the latest schema file
val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ metastore.getThriftTableInfo(carbonTable)
// maintain the deleted columns for schema evolution history
var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
val columnSchemaList = tableInfo.fact_table.table_columns.asScala
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 2503fc3..05c0059 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -108,7 +108,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
sparkSession)
OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
val tableInfo: org.apache.carbondata.format.TableInfo =
- metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ metastore.getThriftTableInfo(carbonTable)
val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
schemaEvolutionEntry.setTableName(newTableName)
timeStamp = System.currentTimeMillis()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 2b35416..6fd0820 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -63,13 +63,13 @@ object TimeSeriesUtil {
* @return whether find only one granularity
*/
def validateTimeSeriesGranularity(
- dmProperties: Map[String, String],
+ dmProperties: java.util.Map[String, String],
dmClassName: String): Boolean = {
var isFound = false
// 1. granularity only support one
for (granularity <- Granularity.values()) {
- if (dmProperties.get(granularity.getName).isDefined) {
+ if (dmProperties.containsKey(granularity.getName)) {
if (isFound) {
throw new MalformedDataMapCommandException(
s"Only one granularity level can be defined")
@@ -104,14 +104,14 @@ object TimeSeriesUtil {
* @return key and value tuple
*/
def getTimeSeriesGranularityDetails(
- dmProperties: Map[String, String],
+ dmProperties: java.util.Map[String, String],
dmClassName: String): (String, String) = {
val defaultValue = "1"
for (granularity <- Granularity.values()) {
- if (dmProperties.get(granularity.getName).isDefined &&
- dmProperties.get(granularity.getName).get.trim.equalsIgnoreCase(defaultValue)) {
- return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName).get)
+ if (dmProperties.containsKey(granularity.getName) &&
+ dmProperties.get(granularity.getName).trim.equalsIgnoreCase(defaultValue)) {
+ return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName))
}
}
@@ -194,8 +194,8 @@ object TimeSeriesUtil {
* @param timeSeriesColumn
* timeseries column name
*/
- def updateTimeColumnSelect(fieldMapping: scala.collection.mutable
- .LinkedHashMap[Field, DataMapField],
+ def updateTimeColumnSelect(
+ fieldMapping: scala.collection.mutable.LinkedHashMap[Field, DataMapField],
timeSeriesColumn: String,
timeSeriesFunction: String) : Any = {
val isTimeSeriesColumnExits = fieldMapping
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index fd09e48..b3438a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -523,8 +523,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
metadata.carbonTables
- override def getThriftTableInfo(carbonTable: CarbonTable)
- (sparkSession: SparkSession): TableInfo = {
+ override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = {
val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)
CarbonUtil.readSchemaFile(tableMetadataFile)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 5e242b7..96ef473 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -96,8 +96,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
Seq()
}
- override def getThriftTableInfo(carbonTable: CarbonTable)
- (sparkSession: SparkSession): format.TableInfo = {
+ override def getThriftTableInfo(carbonTable: CarbonTable): format.TableInfo = {
val schemaConverter = new ThriftWrapperSchemaConverterImpl
schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
carbonTable.getDatabaseName,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index fc5bdac..3f1ebbc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -143,8 +143,9 @@ trait CarbonMetaStore {
def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
- def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession):
- org.apache.carbondata.format.TableInfo
+ def getThriftTableInfo(
+ carbonTable: CarbonTable
+ ): org.apache.carbondata.format.TableInfo
def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index aaa87a3..9c2c7e7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -192,7 +192,7 @@ object AlterTableUtil {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val fileType = FileFactory.getFileType(tablePath)
if (FileFactory.isFileExist(tablePath, fileType)) {
- val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession)
+ val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)
val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -221,7 +221,7 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -246,7 +246,7 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -277,7 +277,7 @@ object AlterTableUtil {
(sparkSession: SparkSession): Unit = {
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
if (updatedTime == timeStamp) {
@@ -326,7 +326,7 @@ object AlterTableUtil {
carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
// get the latest carbon table
// read the latest schema file
- val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+ val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
thriftTableInfo,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index f514074..be8d4d7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -31,13 +31,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory;
import org.apache.carbondata.core.datastore.page.ColumnPage;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.processing.store.TablePage;
/**
- * It is for writing DataMap for one table
+ * It is for writing IndexDataMap for one table
*/
public class DataMapWriterListener {
@@ -55,7 +55,7 @@ public class DataMapWriterListener {
List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
if (tableDataMaps != null) {
for (TableDataMap tableDataMap : tableDataMaps) {
- DataMapFactory factory = tableDataMap.getDataMapFactory();
+ IndexDataMapFactory factory = tableDataMap.getIndexDataMapFactory();
register(factory, segmentId, dataWritePath);
}
}
@@ -64,7 +64,7 @@ public class DataMapWriterListener {
/**
* Register a AbstractDataMapWriter
*/
- private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
+ private void register(IndexDataMapFactory factory, String segmentId, String dataWritePath) {
assert (factory != null);
assert (segmentId != null);
DataMapMeta meta = factory.getMeta();