You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/11 12:02:52 UTC
[11/11] carbondata git commit:
[CARBONDATA-1524][CARBONDATA-1525][AggTable] Added support for aggregate
table drop
[CARBONDATA-1524][CARBONDATA-1525][AggTable] Added support for aggregate table drop
This closes #1443
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ee6bd2f8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ee6bd2f8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ee6bd2f8
Branch: refs/heads/pre-aggregate
Commit: ee6bd2f86168dce4ebd0b3464a527e509307055d
Parents: 1d560c0
Author: kunal642 <ku...@gmail.com>
Authored: Wed Oct 18 20:09:04 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Nov 11 17:32:02 2017 +0530
----------------------------------------------------------------------
.../table/column/ParentColumnTableRelation.java | 2 +
.../preaggregate/TestPreAggregateDrop.scala | 67 ++++++++++++++++++++
.../carbondata/events/DropTableEvents.scala | 6 +-
.../org/apache/carbondata/events/Events.scala | 2 +-
.../command/carbonTableSchemaCommon.scala | 2 +-
.../command/CarbonDropTableCommand.scala | 29 ++++++---
.../CreatePreAggregateTableCommand.scala | 4 +-
.../DropPreAggregateTablePostListener.scala | 49 ++++++++++++++
.../preaaggregate/PreAggregateUtil.scala | 26 ++++----
.../spark/sql/hive/CarbonFileMetastore.scala | 53 ++++++++++++++--
.../spark/sql/hive/CarbonHiveMetaStore.scala | 13 +++-
.../spark/sql/hive/CarbonSessionState.scala | 13 ++++
12 files changed, 231 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
index 425d0f2..28dc12c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ParentColumnTableRelation.java
@@ -29,6 +29,8 @@ import org.apache.carbondata.core.metadata.schema.table.Writable;
*/
public class ParentColumnTableRelation implements Serializable, Writable {
+ private static final long serialVersionUID = 1321746085997166646L;
+
private RelationIdentifier relationIdentifier;
/**
* parent column id
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
new file mode 100644
index 0000000..4dad3e1
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("drop table if exists maintable")
+ sql("drop table if exists preagg1")
+ sql("drop table if exists preagg2")
+ sql("create table maintable (a string, b string, c string) stored by 'carbondata'")
+ }
+
+ test("create and drop preaggregate table") {
+ sql(
+ "create table preagg1 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+ " a,sum(b) from maintable group by a")
+ sql("drop table if exists preagg1")
+ checkExistence(sql("show tables"), false, "preagg1")
+ }
+
+ test("dropping 1 aggregate table should not drop others") {
+ sql(
+ "create table preagg1 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+ " a,sum(b) from maintable group by a")
+ sql(
+ "create table preagg2 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+ " a,sum(c) from maintable group by a")
+ sql("drop table if exists preagg2")
+ val showTables = sql("show tables")
+ checkExistence(showTables, false, "preagg2")
+ checkExistence(showTables, true, "preagg1")
+ }
+
+ test("drop main table and check if preaggreagte is deleted") {
+ sql(
+ "create table preagg2 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+ " a,sum(c) from maintable group by a")
+ sql("drop table if exists maintable")
+ checkExistence(sql("show tables"), false, "preagg1", "maintable", "preagg2")
+ }
+
+ override def afterAll() {
+ sql("drop table if exists maintable")
+ sql("drop table if exists preagg1")
+ sql("drop table if exists preagg2")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
index ed43de6..ab77fba 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
@@ -27,7 +27,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
* @param ifExistsSet
* @param sparkSession
*/
-case class DropTablePreEvent(carbonTable: CarbonTable,
+case class DropTablePreEvent(carbonTable: Option[CarbonTable],
ifExistsSet: Boolean,
sparkSession: SparkSession)
extends Event with DropTableEventInfo
@@ -39,7 +39,7 @@ case class DropTablePreEvent(carbonTable: CarbonTable,
* @param ifExistsSet
* @param sparkSession
*/
-case class DropTablePostEvent(carbonTable: CarbonTable,
+case class DropTablePostEvent(carbonTable: Option[CarbonTable],
ifExistsSet: Boolean,
sparkSession: SparkSession)
extends Event with DropTableEventInfo
@@ -51,7 +51,7 @@ case class DropTablePostEvent(carbonTable: CarbonTable,
* @param ifExistsSet
* @param sparkSession
*/
-case class DropTableAbortEvent(carbonTable: CarbonTable,
+case class DropTableAbortEvent(carbonTable: Option[CarbonTable],
ifExistsSet: Boolean,
sparkSession: SparkSession)
extends Event with DropTableEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 0d923ed..4f8d57e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -57,7 +57,7 @@ trait LookupRelationEventInfo {
* event for drop table
*/
trait DropTableEventInfo {
- val carbonTable: CarbonTable
+ val carbonTable: Option[CarbonTable]
val ifExistsSet: Boolean
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 37ba8a5..759d3d8 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -428,7 +428,7 @@ class TableNewProcessor(cm: TableModel) {
columnSchema.setSortColumn(false)
if(isParentColumnRelation) {
val dataMapField = map.get.get(field).get
- columnSchema.setAggFunction(dataMapField.aggregateFunction);
+ columnSchema.setAggFunction(dataMapField.aggregateFunction)
val relation = dataMapField.columnTableRelation.get
val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
val relationIdentifier = new RelationIdentifier(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
index 5905493..a8e6c37 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonDropTableCommand.scala
@@ -21,12 +21,14 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events.{DropTablePostEvent, DropTablePreEvent, OperationContext, OperationListenerBus}
@@ -60,9 +62,20 @@ case class CarbonDropTableCommand(
lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTableIdentifier, lock)
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-
- // fires the event before dropping main table
- val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
+ val carbonTable: Option[CarbonTable] =
+ catalog.getTableFromMetadataCache(dbName, tableName) match {
+ case Some(tableMeta) => Some(tableMeta.carbonTable)
+ case None => try {
+ Some(catalog.lookupRelation(identifier)(sparkSession)
+ .asInstanceOf[CarbonRelation].metaData.carbonTable)
+ } catch {
+ case ex: NoSuchTableException =>
+ if (!ifExistsSet) {
+ throw ex
+ }
+ None
+ }
+ }
val operationContext = new OperationContext
val dropTablePreEvent: DropTablePreEvent =
DropTablePreEvent(
@@ -70,23 +83,23 @@ case class CarbonDropTableCommand(
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-
CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(tableIdentifier.getTablePath, identifier)(sparkSession)
+ // fires the event after dropping main table
val dropTablePostEvent: DropTablePostEvent =
DropTablePostEvent(
carbonTable,
ifExistsSet,
sparkSession)
- OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-
+ OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
} catch {
case ex: Exception =>
LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
- sys.error(s"Dropping table $dbName.$tableName failed: ${ex.getMessage}")
- } finally {
+ sys.error(s"Dropping table $dbName.$tableName failed: ${ ex.getMessage }")
+ }
+ finally {
if (carbonLocks.nonEmpty) {
val unlocked = carbonLocks.forall(_.unlock())
if (unlocked) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index ca384f9..e42e933 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -24,7 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.{RelationIdentifier, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.CarbonUtil
/**
@@ -132,5 +132,3 @@ case class CreatePreAggregateTableCommand(
Seq.empty
}
}
-
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
new file mode 100644
index 0000000..7127c46
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/DropPreAggregateTablePostListener.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.preaaggregate
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.command.CarbonDropTableCommand
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationContext, OperationEventListener}
+
+class DropPreAggregateTablePostListener extends OperationEventListener {
+
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
+ val carbonTable = dropPostEvent.carbonTable
+ val sparkSession = dropPostEvent.sparkSession
+ if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList != null &&
+ !carbonTable.get.getTableInfo.getDataMapSchemaList.isEmpty) {
+ val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+ for (childSchema: DataMapSchema <- childSchemas.asScala) {
+ CarbonDropTableCommand(ifExistsSet = true,
+ Some(childSchema.getRelationIdentifier.getDatabaseName),
+ childSchema.getRelationIdentifier.getTableName).run(sparkSession)
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index c4b6783..fd0e543 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -59,8 +59,8 @@ object PreAggregateUtil {
/**
* Below method will be used to validate the select plan
* and get the required fields from select plan
- * Currently only aggregate query is support any other type of query will
- * fail
+ * Currently only aggregate query is support any other type of query will fail
+ *
* @param plan
* @param selectStmt
* @return list of fields
@@ -89,11 +89,11 @@ object PreAggregateUtil {
throw new MalformedCarbonCommandException(
"Distinct is not supported On Pre Aggregation")
}
- fieldToDataMapFieldMap ++= ((validateAggregateFunctionAndGetFields(carbonTable,
+ fieldToDataMapFieldMap ++= (validateAggregateFunctionAndGetFields(carbonTable,
attr.aggregateFunction,
parentTableName,
parentDatabaseName,
- parentTableId)))
+ parentTableId))
case attr: AttributeReference =>
fieldToDataMapFieldMap += getField(attr.name,
attr.dataType,
@@ -124,6 +124,7 @@ object PreAggregateUtil {
* in case of any other aggregate function it will throw error
* In case of avg it will return two fields one for count
* and other of sum of that column to support rollup
+ *
* @param carbonTable
* @param aggFunctions
* @param parentTableName
@@ -220,6 +221,7 @@ object PreAggregateUtil {
/**
* Below method will be used to get the fields object for pre aggregate table
+ *
* @param columnName
* @param dataType
* @param aggregateType
@@ -256,8 +258,7 @@ object PreAggregateUtil {
precision = precision,
scale = scale,
rawSchema = rawSchema), dataMapField)
- }
- else {
+ } else {
(Field(column = actualColumnName,
dataType = Some(dataType.typeName),
name = Some(actualColumnName),
@@ -268,7 +269,8 @@ object PreAggregateUtil {
/**
* Below method will be used to update the main table about the pre aggregate table information
- * in case of any exption it will throw error so pre aggregate table creation will fail
+ * in case of any exception it will throw error so pre aggregate table creation will fail
+ *
* @param dbName
* @param tableName
* @param childSchema
@@ -304,9 +306,8 @@ object PreAggregateUtil {
val thriftTable = schemaConverter
.fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
updateSchemaInfo(carbonTable,
- thriftTable)(sparkSession,
- sparkSession.sessionState.asInstanceOf[CarbonSessionState])
- LOGGER.info(s"Pre Aggeragte Parent table updated is successful for table $dbName.$tableName")
+ thriftTable)(sparkSession)
+ LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
@@ -321,14 +322,13 @@ object PreAggregateUtil {
/**
* Below method will be used to update the main table schema
+ *
* @param carbonTable
* @param thriftTable
* @param sparkSession
- * @param sessionState
*/
def updateSchemaInfo(carbonTable: CarbonTable,
- thriftTable: TableInfo)(sparkSession: SparkSession,
- sessionState: CarbonSessionState): Unit = {
+ thriftTable: TableInfo)(sparkSession: SparkSession): Unit = {
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getFactTableName
CarbonEnv.getInstance(sparkSession).carbonMetastore
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 51c7f3b..ac75fa7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -20,12 +20,14 @@ package org.apache.spark.sql.hive
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, SparkSession}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -34,11 +36,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.fileoperations.FileWriteOperation
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema
import org.apache.carbondata.core.metadata.schema.table
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.writer.ThriftWriter
@@ -449,6 +450,41 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
}
+ protected def updateParentTableInfo(parentRelationIdentifier: RelationIdentifier,
+ childCarbonTable: CarbonTable)(sparkSession: SparkSession): Unit = {
+ val dbName = parentRelationIdentifier.getDatabaseName
+ val tableName = parentRelationIdentifier.getTableName
+ val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
+ try {
+ val tableMeta = metaStore.getTableFromMetadataCache(dbName, tableName)
+ if (tableMeta.isDefined) {
+ val parentCarbonTable = tableMeta.get.carbonTable
+ val childSchemas = parentCarbonTable.getTableInfo.getDataMapSchemaList
+ if (childSchemas == null) {
+ throw UninitializedFieldError("Child schemas is not initialized")
+ }
+ val childSchemaIterator = childSchemas.iterator()
+ while (childSchemaIterator.hasNext) {
+ val childSchema = childSchemaIterator.next()
+ if (childSchema.getChildSchema.equals(childCarbonTable.getTableInfo.getFactTable)) {
+ childSchemaIterator.remove()
+ }
+ }
+ val schemaConverter = new ThriftWrapperSchemaConverterImpl
+ PreAggregateUtil
+ .updateSchemaInfo(parentCarbonTable,
+ schemaConverter
+ .fromWrapperToExternalTableInfo(parentCarbonTable.getTableInfo,
+ dbName,
+ tableName))(sparkSession)
+ }
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, s"Updating parent table $dbName.$tableName failed.")
+ throw ex
+ }
+ }
+
def dropTable(tablePath: String, tableIdentifier: TableIdentifier)
(sparkSession: SparkSession) {
val dbName = tableIdentifier.database.get
@@ -461,6 +497,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
val fileType = FileFactory.getFileType(metadataFilePath)
+ if (carbonTable != null) {
+ val parentRelations = carbonTable.getTableInfo.getParentRelationIdentifiers
+ if (parentRelations != null && !parentRelations.isEmpty) {
+ for (parentRelation: RelationIdentifier <- parentRelations.asScala) {
+ updateParentTableInfo(parentRelation, carbonTable)(sparkSession)
+ }
+ }
+ }
if (FileFactory.isFileExist(metadataFilePath, fileType)) {
// while drop we should refresh the schema modified time so that if any thing has changed
@@ -468,6 +512,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
removeTableFromMetadata(dbName, tableName)
+
updateSchemasUpdatedTime(touchSchemaFileSystemTime(identifier.getStorePath))
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index c64b7bb..6bd80f3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -18,14 +18,16 @@ package org.apache.spark.sql.hive
import scala.collection.JavaConverters._
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier}
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.format
@@ -79,6 +81,12 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
}
checkSchemasModifiedTimeAndReloadTables(identifier.getStorePath)
+ val parentRelations = carbonTable.getTableInfo.getParentRelationIdentifiers
+ if (parentRelations != null && !parentRelations.isEmpty) {
+ for (parentRelation: RelationIdentifier <- parentRelations.asScala) {
+ updateParentTableInfo(parentRelation, carbonTable)(sparkSession)
+ }
+ }
removeTableFromMetadata(dbName, tableName)
CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
// discard cached table info in cachedDataSourceTables
@@ -107,6 +115,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
carbonTable.getFactTableName)
}
+
/**
* This method will overwrite the existing schema and update it with the given details
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ee6bd2f8/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index 9cad7b0..97ea7f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.execution.SparkOptimizer
+import org.apache.spark.sql.execution.command.preaaggregate.DropPreAggregateTablePostListener
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.strategy.{CarbonLateDecodeStrategy, DDLStrategy, StreamingTableStrategy}
import org.apache.spark.sql.internal.SQLConf
@@ -34,6 +35,7 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParser
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.events.{DropTablePostEvent, Event, OperationListenerBus}
/**
* This class will have carbon catalog and refresh the relation from cache if the carbontable in
@@ -124,6 +126,15 @@ class CarbonSessionCatalog(
}
}
+object CarbonSessionState {
+
+ def init(): Unit = {
+ OperationListenerBus.getInstance()
+ .addListener(classOf[DropTablePostEvent], new DropPreAggregateTablePostListener)
+ }
+
+}
+
/**
* Session state implementation to override sql parser and adding strategies
* @param sparkSession
@@ -140,6 +151,8 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
)
experimentalMethods.extraOptimizations = Seq(new CarbonLateDecodeRule)
+ CarbonSessionState.init()
+
override lazy val optimizer: Optimizer = new CarbonOptimizer(catalog, conf, experimentalMethods)
override lazy val analyzer: Analyzer = {