You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/04 14:57:45 UTC

[03/20] carbondata git commit: [CARBONDATA-2189] Add DataMapProvider developer interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
new file mode 100644
index 0000000..4c0e637
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+
+/**
+ * Below helper class will be used to create pre-aggregate table
+ * and updating the parent table about the child table information
+ * It will be either success or nothing happen in case of failure:
+ * 1. failed to create pre aggregate table.
+ * 2. failed to update main table
+ *
+ */
+case class PreAggregateTableHelper(
+    var parentTable: CarbonTable,
+    dataMapName: String,
+    dataMapClassName: String,
+    dataMapProperties: java.util.Map[String, String],
+    queryString: String,
+    timeSeriesFunction: Option[String] = None,
+    ifNotExistsSet: Boolean = false) {
+
+  var loadCommand: CarbonLoadDataCommand = _
+
+  def initMeta(sparkSession: SparkSession): Seq[Row] = {
+    val dmProperties = dataMapProperties.asScala
+    val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+    val df = sparkSession.sql(updatedQuery)
+    val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+      df.logicalPlan, queryString)
+    val fields = fieldRelationMap.keySet.toSeq
+    val tableProperties = mutable.Map[String, String]()
+    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
+    val selectTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
+    if (!parentTable.getTableName.equalsIgnoreCase(selectTable.getTableName)) {
+      throw new MalformedDataMapCommandException(
+        "Parent table name is different in select and create")
+    }
+    var neworder = Seq[String]()
+    val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
+    parentOrder.foreach(parentcol =>
+      fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
+                           parentcol.equals(fieldRelationMap(col).
+                             columnTableRelationList.get(0).parentColumnName))
+        .map(cols => neworder :+= cols.column)
+    )
+    tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
+    tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
+      getTableProperties.asScala.getOrElse("sort_scope", CarbonCommonConstants
+      .LOAD_SORT_SCOPE_DEFAULT))
+    tableProperties
+      .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+    val tableIdentifier =
+      TableIdentifier(parentTable.getTableName + "_" + dataMapName,
+        Some(parentTable.getDatabaseName))
+    // prepare table model of the collected tokens
+    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+      ifNotExistPresent = ifNotExistsSet,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+      tableIdentifier.table.toLowerCase,
+      fields,
+      Seq(),
+      tableProperties,
+      None,
+      isAlterFlow = false,
+      None)
+
+    // updating the relation identifier, this will be stored in child table
+    // which can be used during dropping of pre-aggreate table as parent table will
+    // also get updated
+    if(timeSeriesFunction != null) {
+      TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties.toMap, parentTable)
+      TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
+        fieldRelationMap,
+        dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME))
+      TimeSeriesUtil.updateTimeColumnSelect(
+        fieldRelationMap,
+        dmProperties(TimeSeriesUtil.TIMESERIES_EVENTTIME),
+        timeSeriesFunction.get)
+    }
+    tableModel.parentTable = Some(parentTable)
+    tableModel.dataMapRelation = Some(fieldRelationMap)
+    val tablePath = if (dmProperties.contains("path")) {
+      dmProperties("path")
+    } else {
+      CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+    }
+    CarbonCreateTableCommand(TableNewProcessor(tableModel),
+      tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
+
+    val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
+    val tableInfo = table.getTableInfo
+
+    // child schema object will be saved on parent table schema
+    val childSchema = tableInfo.getFactTable.buildChildSchema(
+      dataMapName,
+      dataMapClassName,
+      tableInfo.getDatabaseName,
+      queryString,
+      "AGGREGATION")
+    dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+
+    // updating the parent table about child table
+    PreAggregateUtil.updateMainTable(parentTable, childSchema, sparkSession)
+
+    // After updating the parent carbon table with data map entry extract the latest table object
+    // to be used in further create process.
+    parentTable = CarbonEnv.getCarbonTable(Some(parentTable.getDatabaseName),
+      parentTable.getTableName)(sparkSession)
+
+    val updatedLoadQuery = if (timeSeriesFunction != null) {
+      PreAggregateUtil.createTimeSeriesSelectQueryFromMain(
+        childSchema.getChildSchema,
+        parentTable.getTableName,
+        parentTable.getDatabaseName)
+    } else {
+      queryString
+    }
+    val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
+      updatedLoadQuery)).drop("preAggLoad")
+    loadCommand = PreAggregateUtil.createLoadCommandForChild(
+      childSchema.getChildSchema.getListOfColumns,
+      tableIdentifier,
+      dataFrame,
+      isOverwrite = false,
+      sparkSession = sparkSession)
+    loadCommand.processMetadata(sparkSession)
+    Seq.empty
+  }
+
+  def initData(sparkSession: SparkSession): Seq[Row] = {
+    // load child table if parent table has existing segments
+    // This will be used to check if the parent table has any segments or not. If not then no
+    // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
+    // table.
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
+    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
+    if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
+      load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
+      throw new UnsupportedOperationException(
+        "Cannot create pre-aggregate table when insert is in progress on main table")
+    } else if (loadAvailable.nonEmpty) {
+      // Passing segmentToLoad as * because we want to load all the segments into the
+      // pre-aggregate table even if the user has set some segments on the parent table.
+      loadCommand.dataFrame = Some(PreAggregateUtil
+        .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
+      PreAggregateUtil.startDataLoadForDataMap(
+        TableIdentifier(parentTable.getTableName, Some(parentTable.getDatabaseName)),
+        segmentToLoad = "*",
+        validateSegments = true,
+        loadCommand,
+        isOverwrite = false,
+        sparkSession)
+    }
+    Seq.empty
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index dd42b50..b52d0e7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonRelation}
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
@@ -407,11 +407,11 @@ object PreAggregateUtil {
    * Below method will be used to update the main table about the pre aggregate table information
    * in case of any exception it will throw error so pre aggregate table creation will fail
    *
-   * @param childSchema
-   * @param sparkSession
+   * @return the existing TableInfo object before updating, it can be used to recover if any
+   *         operation failed later
    */
   def updateMainTable(carbonTable: CarbonTable,
-      childSchema: DataMapSchema, sparkSession: SparkSession): Unit = {
+      childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.DROP_TABLE_LOCK)
@@ -424,7 +424,7 @@ object PreAggregateUtil {
       locks = acquireLock(dbName, tableName, locksToBeAcquired, carbonTable)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,
@@ -436,12 +436,11 @@ object PreAggregateUtil {
         throw new MetadataProcessException("DataMap name already exist")
       }
       wrapperTableInfo.getDataMapSchemaList.add(childSchema)
-      val thriftTable = schemaConverter
-        .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
-      updateSchemaInfo(carbonTable,
-        thriftTable)(sparkSession)
-      LOGGER.info(s"Parent table updated is successful for table" +
-                  s" $dbName.${childSchema.getRelationIdentifier.toString}")
+      val thriftTable = schemaConverter.fromWrapperToExternalTableInfo(
+        wrapperTableInfo, dbName, tableName)
+      updateSchemaInfo(carbonTable, thriftTable)(sparkSession)
+      LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName")
+      thriftTableInfo
     } catch {
       case e: Exception =>
         LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
@@ -450,7 +449,6 @@ object PreAggregateUtil {
       // release lock after command execution completion
       releaseLocks(locks)
     }
-    Seq.empty
   }
 
   /**
@@ -526,7 +524,7 @@ object PreAggregateUtil {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
     carbonTable.getTableLastUpdatedTime
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
       metastore.revertTableSchemaForPreAggCreationFailure(
         carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 064ba18..4962c3a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -63,7 +63,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext)
       // get the latest carbon table and check for column existence
       // read the latest schema file
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter
         .fromExternalToWrapperTableInfo(thriftTableInfo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 4ef0df9..c8f7ac7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -74,7 +74,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       }
       // read the latest schema file
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTable)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)
       // maintain the added column for schema evolution history
       var addColumnSchema: org.apache.carbondata.format.ColumnSchema = null
       var deletedColumnSchema: org.apache.carbondata.format.ColumnSchema = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 7bbefd7..a64bdb9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -100,7 +100,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
 
       // read the latest schema file
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTable)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)
       // maintain the deleted columns for schema evolution history
       var deletedColumnSchema = ListBuffer[org.apache.carbondata.format.ColumnSchema]()
       val columnSchemaList = tableInfo.fact_table.table_columns.asScala

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 2503fc3..05c0059 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -108,7 +108,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTable)(sparkSession)
+        metastore.getThriftTableInfo(carbonTable)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 2b35416..6fd0820 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -63,13 +63,13 @@ object TimeSeriesUtil {
    * @return whether find  only one granularity
    */
   def validateTimeSeriesGranularity(
-      dmProperties: Map[String, String],
+      dmProperties: java.util.Map[String, String],
       dmClassName: String): Boolean = {
     var isFound = false
 
     // 1. granularity only support one
     for (granularity <- Granularity.values()) {
-      if (dmProperties.get(granularity.getName).isDefined) {
+      if (dmProperties.containsKey(granularity.getName)) {
         if (isFound) {
           throw new MalformedDataMapCommandException(
             s"Only one granularity level can be defined")
@@ -104,14 +104,14 @@ object TimeSeriesUtil {
    * @return key and value tuple
    */
   def getTimeSeriesGranularityDetails(
-      dmProperties: Map[String, String],
+      dmProperties: java.util.Map[String, String],
       dmClassName: String): (String, String) = {
 
     val defaultValue = "1"
     for (granularity <- Granularity.values()) {
-      if (dmProperties.get(granularity.getName).isDefined &&
-        dmProperties.get(granularity.getName).get.trim.equalsIgnoreCase(defaultValue)) {
-        return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName).get)
+      if (dmProperties.containsKey(granularity.getName) &&
+        dmProperties.get(granularity.getName).trim.equalsIgnoreCase(defaultValue)) {
+        return (granularity.toString.toLowerCase, dmProperties.get(granularity.getName))
       }
     }
 
@@ -194,8 +194,8 @@ object TimeSeriesUtil {
    * @param timeSeriesColumn
    *                         timeseries column name
    */
-  def updateTimeColumnSelect(fieldMapping: scala.collection.mutable
-  .LinkedHashMap[Field, DataMapField],
+  def updateTimeColumnSelect(
+      fieldMapping: scala.collection.mutable.LinkedHashMap[Field, DataMapField],
       timeSeriesColumn: String,
       timeSeriesFunction: String) : Any = {
     val isTimeSeriesColumnExits = fieldMapping

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index fd09e48..b3438a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -523,8 +523,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
   override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
     metadata.carbonTables
 
-  override def getThriftTableInfo(carbonTable: CarbonTable)
-    (sparkSession: SparkSession): TableInfo = {
+  override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = {
     val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)
     CarbonUtil.readSchemaFile(tableMetadataFile)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 5e242b7..96ef473 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -96,8 +96,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     Seq()
   }
 
-  override def getThriftTableInfo(carbonTable: CarbonTable)
-    (sparkSession: SparkSession): format.TableInfo = {
+  override def getThriftTableInfo(carbonTable: CarbonTable): format.TableInfo = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     schemaConverter.fromWrapperToExternalTableInfo(carbonTable.getTableInfo,
       carbonTable.getDatabaseName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index fc5bdac..3f1ebbc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -143,8 +143,9 @@ trait CarbonMetaStore {
 
   def listAllTables(sparkSession: SparkSession): Seq[CarbonTable]
 
-  def getThriftTableInfo(carbonTable: CarbonTable)(sparkSession: SparkSession):
-  org.apache.carbondata.format.TableInfo
+  def getThriftTableInfo(
+      carbonTable: CarbonTable
+  ): org.apache.carbondata.format.TableInfo
 
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable]
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index aaa87a3..9c2c7e7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -192,7 +192,7 @@ object AlterTableUtil {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val fileType = FileFactory.getFileType(tablePath)
     if (FileFactory.isFileExist(tablePath, fileType)) {
-      val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)(sparkSession)
+      val tableInfo = metastore.getThriftTableInfo(oldCarbonTable)
       val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
       val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
       if (updatedTime == timeStamp) {
@@ -221,7 +221,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -246,7 +246,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -277,7 +277,7 @@ object AlterTableUtil {
     (sparkSession: SparkSession): Unit = {
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTable)
     val evolutionEntryList = thriftTable.fact_table.schema_evolution.schema_evolution_history
     val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
     if (updatedTime == timeStamp) {
@@ -326,7 +326,7 @@ object AlterTableUtil {
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
       // get the latest carbon table
       // read the latest schema file
-      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)(sparkSession)
+      val thriftTableInfo: TableInfo = metastore.getThriftTableInfo(carbonTable)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl()
       val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         thriftTableInfo,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f8ded96e/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index f514074..be8d4d7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -31,13 +31,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
-import org.apache.carbondata.core.datamap.dev.DataMapFactory;
+import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.processing.store.TablePage;
 
 /**
- * It is for writing DataMap for one table
+ * It is for writing IndexDataMap for one table
  */
 public class DataMapWriterListener {
 
@@ -55,7 +55,7 @@ public class DataMapWriterListener {
     List<TableDataMap> tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);
     if (tableDataMaps != null) {
       for (TableDataMap tableDataMap : tableDataMaps) {
-        DataMapFactory factory = tableDataMap.getDataMapFactory();
+        IndexDataMapFactory factory = tableDataMap.getIndexDataMapFactory();
         register(factory, segmentId, dataWritePath);
       }
     }
@@ -64,7 +64,7 @@ public class DataMapWriterListener {
   /**
    * Register a AbstractDataMapWriter
    */
-  private void register(DataMapFactory factory, String segmentId, String dataWritePath) {
+  private void register(IndexDataMapFactory factory, String segmentId, String dataWritePath) {
     assert (factory != null);
     assert (segmentId != null);
     DataMapMeta meta = factory.getMeta();