You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:12:08 UTC

[42/49] carbondata git commit: [CARBONDATA-1524][CARBONDATA-1525][AggTable] Added support for aggregate table drop

[CARBONDATA-1524][CARBONDATA-1525][AggTable] Added support for aggregate table drop

This closes #1443


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f7f516ef
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f7f516ef
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f7f516ef

Branch: refs/heads/fgdatamap
Commit: f7f516ef665c98e43fce0427c40da933bb6f3185
Parents: 3d1d1ce
Author: kunal642 <ku...@gmail.com>
Authored: Wed Oct 18 20:09:04 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:48:16 2017 +0530

----------------------------------------------------------------------
 .../table/column/ParentColumnTableRelation.java |  2 +
 .../preaggregate/TestPreAggregateDrop.scala     | 67 ++++++++++++++++++++
 .../carbondata/events/DropTableEvents.scala     |  6 +-
 .../org/apache/carbondata/events/Events.scala   |  2 +-
 .../command/carbonTableSchemaCommon.scala       |  2 +-
 .../command/CarbonDropTableCommand.scala        | 29 ++++++---
 .../CreatePreAggregateTableCommand.scala        |  4 +-
 .../DropPreAggregateTablePostListener.scala     | 49 ++++++++++++++
 .../preaaggregate/PreAggregateUtil.scala        | 26 ++++----
 .../spark/sql/hive/CarbonFileMetastore.scala    | 53 ++++++++++++++--
 .../spark/sql/hive/CarbonHiveMetaStore.scala    | 13 +++-
 .../spark/sql/hive/CarbonSessionState.scala     | 13 ++++
 12 files changed, 231 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
index 425d0f2..28dc12c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
  */
 public class ParentColumnTableRelation implements Serializable, Writable {
 
+  private static final long serialVersionUID = 1321746085997166646L;
+
   private RelationIdentifier relationIdentifier;
   /**
    * parent column id

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
new file mode 100644
index 0000000..4dad3e1
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists maintable")
+    sql("drop table if exists preagg1")
+    sql("drop table if exists preagg2")
+    sql("create table maintable (a string, b string, c string) stored by 'carbondata'")
+  }
+
+  test("create and drop preaggregate table") {
+    sql(
+      "create table preagg1 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+      " a,sum(b) from maintable group by a")
+    sql("drop table if exists preagg1")
+    checkExistence(sql("show tables"), false, "preagg1")
+  }
+
+  test("dropping 1 aggregate table should not drop others") {
+    sql(
+      "create table preagg1 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+      " a,sum(b) from maintable group by a")
+    sql(
+      "create table preagg2 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+      " a,sum(c) from maintable group by a")
+    sql("drop table if exists preagg2")
+    val showTables = sql("show tables")
+    checkExistence(showTables, false, "preagg2")
+    checkExistence(showTables, true, "preagg1")
+  }
+  
+  test("drop main table and check if preaggreagte is deleted") {
+    sql(
+      "create table preagg2 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+      " a,sum(c) from maintable group by a")
+    sql("drop table if exists maintable")
+    checkExistence(sql("show tables"), false, "preagg1", "maintable", "preagg2")
+  }
+
+  override def afterAll() {
+    sql("drop table if exists maintable")
+    sql("drop table if exists preagg1")
+    sql("drop table if exists preagg2")
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
index ed43de6..ab77fba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTablePreEvent(carbonTable: CarbonTable,
+case class DropTablePreEvent(carbonTable: Option[CarbonTable],
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo
@@ -39,7 +39,7 @@ case class DropTablePreEvent(carbonTable: CarbonTable,
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTablePostEvent(carbonTable: CarbonTable,
+case class DropTablePostEvent(carbonTable: Option[CarbonTable],
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo
@@ -51,7 +51,7 @@ case class DropTablePostEvent(carbonTable: CarbonTable,
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTableAbortEvent(carbonTable: CarbonTable,
+case class DropTableAbortEvent(carbonTable: Option[CarbonTable],
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 0d923ed..4f8d57e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -57,7 +57,7 @@ trait LookupRelationEventInfo {
  * event for drop table
  */
 trait DropTableEventInfo {
-  val carbonTable: CarbonTable
+  val carbonTable: Option[CarbonTable]
   val ifExistsSet: Boolean
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 37ba8a5..759d3d8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -428,7 +428,7 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setSortColumn(false)
     if(isParentColumnRelation) {
       val dataMapField = map.get.get(field).get
-      columnSchema.setAggFunction(dataMapField.aggregateFunction);
+      columnSchema.setAggFunction(dataMapField.aggregateFunction)
         val relation = dataMapField.columnTableRelation.get
         val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
         val relationIdentifier = new RelationIdentifier(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 5905493..a8e6c37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -21,12 +21,14 @@ import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{DropTablePostEvent, DropTablePreEvent, OperationContext, OperationListenerBus}
@@ -60,9 +62,20 @@ case class CarbonDropTableCommand(
         lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-
-      // fires the event before dropping main table
-      val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+      val carbonTable: Option[CarbonTable] =
+        catalog.getTableFromMetadataCache(dbName, tableName) match {
+          case Some(tableMeta) => Some(tableMeta.carbonTable)
+          case None => try {
+            Some(catalog.lookupRelation(identifier)(sparkSession)
+              .asInstanceOf[CarbonRelation].metaData.carbonTable)
+          } catch {
+            case ex: NoSuchTableException =>
+              if (!ifExistsSet) {
+                throw ex
+              }
+              None
+          }
+        }
       val operationContext = new OperationContext
       val dropTablePreEvent: DropTablePreEvent =
         DropTablePreEvent(
@@ -70,23 +83,23 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-
       CarbonEnv.getInstance(sparkSession).carbonMetastore
         .dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
 
+      // fires the event after dropping main table
       val dropTablePostEvent: DropTablePostEvent =
         DropTablePostEvent(
           carbonTable,
           ifExistsSet,
           sparkSession)
-      OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-
+      OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
     } catch {
       case ex: Exception =>
         LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
-        sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
-    } finally {
+        sys.error(s"Dropping table $dbName.$tableName failed: ${ ex.getMessage }")
+    }
+    finally {
       if (carbonLocks.nonEmpty) {
         val unlocked = carbonLocks.forall(_.unlock())
         if (unlocked) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index ca384f9..e42e933 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{RelationIdentifier, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.CarbonUtil
 
 /**
@@ -132,5 +132,3 @@ case class CreatePreAggregateTableCommand(
     Seq.empty
   }
 }
-
-

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
new file mode 100644
index 0000000..7127c46
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationContext, OperationEventListener}
+
+class DropPreAggregateTablePostListener extends OperationEventListener {
+
+  /**
+   * Called on a specified event occurrence
+   *
+   * @param event
+   */
+  override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+    val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
+    val carbonTable = dropPostEvent.carbonTable
+    val sparkSession = dropPostEvent.sparkSession
+    if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList != null &&
+        !carbonTable.get.getTableInfo.getDataMapSchemaList.isEmpty) {
+      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+      for (childSchema: DataMapSchema <- childSchemas.asScala) {
+        CarbonDropTableCommand(ifExistsSet = true,
+          Some(childSchema.getRelationIdentifier.getDatabaseName),
+          childSchema.getRelationIdentifier.getTableName).run(sparkSession)
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index c4b6783..fd0e543 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -59,8 +59,8 @@ object PreAggregateUtil {
   /**
    * Below method will be used to validate the select plan
    * and get the required fields from select plan
-   * Currently only aggregate query is support any other type of query will
-   * fail
+   * Currently only aggregate query is support any other type of query will fail
+   *
    * @param plan
    * @param selectStmt
    * @return list of fields
@@ -89,11 +89,11 @@ object PreAggregateUtil {
               throw new MalformedCarbonCommandException(
                 "Distinct is not supported On Pre Aggregation")
             }
-            fieldToDataMapFieldMap ++= ((validateAggregateFunctionAndGetFields(carbonTable,
+            fieldToDataMapFieldMap ++= (validateAggregateFunctionAndGetFields(carbonTable,
               attr.aggregateFunction,
               parentTableName,
               parentDatabaseName,
-              parentTableId)))
+              parentTableId))
           case attr: AttributeReference =>
             fieldToDataMapFieldMap += getField(attr.name,
               attr.dataType,
@@ -124,6 +124,7 @@ object PreAggregateUtil {
    * in case of any other aggregate function it will throw error
    * In case of avg it will return two fields one for count
    * and other of sum of that column to support rollup
+   *
    * @param carbonTable
    * @param aggFunctions
    * @param parentTableName
@@ -220,6 +221,7 @@ object PreAggregateUtil {
 
   /**
    * Below method will be used to get the fields object for pre aggregate table
+   *
    * @param columnName
    * @param dataType
    * @param aggregateType
@@ -256,8 +258,7 @@ object PreAggregateUtil {
         precision = precision,
         scale = scale,
         rawSchema = rawSchema), dataMapField)
-    }
-    else {
+    } else {
       (Field(column = actualColumnName,
         dataType = Some(dataType.typeName),
         name = Some(actualColumnName),
@@ -268,7 +269,8 @@ object PreAggregateUtil {
 
   /**
    * Below method will be used to update the main table about the pre aggregate table information
-   * in case of any exption it will throw error so pre aggregate table creation will fail
+   * in case of any exception it will throw error so pre aggregate table creation will fail
+   *
    * @param dbName
    * @param tableName
    * @param childSchema
@@ -304,9 +306,8 @@ object PreAggregateUtil {
       val thriftTable = schemaConverter
         .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
       updateSchemaInfo(carbonTable,
-        thriftTable)(sparkSession,
-        sparkSession.sessionState.asInstanceOf[CarbonSessionState])
-      LOGGER.info(s"Pre Aggeragte Parent table updated is successful for table $dbName.$tableName")
+        thriftTable)(sparkSession)
+      LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
         LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
@@ -321,14 +322,13 @@ object PreAggregateUtil {
 
   /**
    * Below method will be used to update the main table schema
+   *
    * @param carbonTable
    * @param thriftTable
    * @param sparkSession
-   * @param sessionState
    */
   def updateSchemaInfo(carbonTable: CarbonTable,
-      thriftTable: TableInfo)(sparkSession: SparkSession,
-      sessionState: CarbonSessionState): Unit = {
+      thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getFactTableName
     CarbonEnv.getInstance(sparkSession).carbonMetastore

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 51c7f3b..ac75fa7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.hive
 import java.util.UUID
 import java.util.concurrent.atomic.AtomicLong
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -34,11 +36,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
 import org.apache.carbondata.core.metadata.schema.table
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
@@ -449,6 +450,41 @@ class CarbonFileMetastore extends CarbonMetaStore {
     }
   }
 
+  protected def updateParentTableInfo(parentRelationIdentifier: RelationIdentifier,
+      childCarbonTable: CarbonTable)(sparkSession: SparkSession): Unit = {
+    val dbName = parentRelationIdentifier.getDatabaseName
+    val tableName = parentRelationIdentifier.getTableName
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    try {
+      val tableMeta = metaStore.getTableFromMetadataCache(dbName, tableName)
+      if (tableMeta.isDefined) {
+        val parentCarbonTable = tableMeta.get.carbonTable
+        val childSchemas = parentCarbonTable.getTableInfo.getDataMapSchemaList
+        if (childSchemas == null) {
+          throw UninitializedFieldError("Child schemas is not initialized")
+        }
+        val childSchemaIterator = childSchemas.iterator()
+        while (childSchemaIterator.hasNext) {
+          val childSchema = childSchemaIterator.next()
+          if (childSchema.getChildSchema.equals(childCarbonTable.getTableInfo.getFactTable)) {
+            childSchemaIterator.remove()
+          }
+        }
+        val schemaConverter = new ThriftWrapperSchemaConverterImpl
+        PreAggregateUtil
+          .updateSchemaInfo(parentCarbonTable,
+            schemaConverter
+              .fromWrapperToExternalTableInfo(parentCarbonTable.getTableInfo,
+                dbName,
+                tableName))(sparkSession)
+      }
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, s"Updating parent table $dbName.$tableName failed.")
+        throw ex
+    }
+  }
+
   def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession) {
     val dbName = tableIdentifier.database.get
@@ -461,6 +497,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
     val fileType = FileFactory.getFileType(metadataFilePath)
+    if (carbonTable != null) {
+      val parentRelations = carbonTable.getTableInfo.getParentRelationIdentifiers
+      if (parentRelations != null && !parentRelations.isEmpty) {
+        for (parentRelation: RelationIdentifier <- parentRelations.asScala) {
+          updateParentTableInfo(parentRelation, carbonTable)(sparkSession)
+        }
+      }
+    }
 
     if (FileFactory.isFileExist(metadataFilePath, fileType)) {
       // while drop we should refresh the schema modified time so that if any thing has changed
@@ -468,6 +512,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
 
       removeTableFromMetadata(dbName, tableName)
+
       updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       // discard cached table info in cachedDataSourceTables

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index c64b7bb..6bd80f3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -18,14 +18,16 @@ package org.apache.spark.sql.hive
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.format
@@ -79,6 +81,12 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
     checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+    val parentRelations = carbonTable.getTableInfo.getParentRelationIdentifiers
+    if (parentRelations != null && !parentRelations.isEmpty) {
+      for (parentRelation: RelationIdentifier <- parentRelations.asScala) {
+        updateParentTableInfo(parentRelation, carbonTable)(sparkSession)
+      }
+    }
     removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
@@ -107,6 +115,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       carbonTable.getFactTableName)
   }
 
+
   /**
    * This method will overwrite the existing schema and update it with the given details
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f7f516ef/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 9cad7b0..97ea7f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.command.preaaggregate.DropPreAggregateTablePostListener
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
 import org.apache.spark.sql.internal.SQLConf
@@ -34,6 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationListenerBus}
 
 /**
  * This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -124,6 +126,15 @@ class CarbonSessionCatalog(
   }
 }
 
+object CarbonSessionState {
+
+  def init(): Unit = {
+    OperationListenerBus.getInstance()
+      .addListener(classOf[DropTablePostEvent], new DropPreAggregateTablePostListener)
+  }
+
+}
+
 /**
  * Session state implementation to override sql parser and adding strategies
  * @param sparkSession
@@ -140,6 +151,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
     )
   experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
 
+  CarbonSessionState.init()
+
   override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
 
   override lazy val analyzer: Analyzer = {