You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/07 20:46:21 UTC
carbondata git commit: [CARBONDATA-1592] Added preUpdateStatus Event
Listeners, corrected event parameters,
Repository: carbondata
Updated Branches:
refs/heads/master 6b7217a8d -> 0da0a4f61
[CARBONDATA-1592] Added preUpdateStatus Event Listeners, corrected event parameters,
This closes #1614
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0da0a4f6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0da0a4f6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0da0a4f6
Branch: refs/heads/master
Commit: 0da0a4f614130a30a3edd527c248ec27d6ac5ca8
Parents: 6b7217a
Author: Manohar <ma...@gmail.com>
Authored: Tue Dec 5 18:44:29 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Dec 8 02:16:04 2017 +0530
----------------------------------------------------------------------
.../carbondata/events/AlterTableEvents.scala | 50 ++++++++++++++------
.../org/apache/carbondata/events/Events.scala | 10 +++-
.../apache/carbondata/events/LoadEvents.scala | 4 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 31 +++++++-----
.../spark/rdd/CarbonTableCompactor.scala | 18 +++++--
.../org/apache/spark/sql/CarbonSession.scala | 2 +-
.../CarbonAlterTableCompactionCommand.scala | 22 +++++++--
.../management/CarbonLoadDataCommand.scala | 23 +++++----
.../preaaggregate/PreAggregateListeners.scala | 4 +-
.../schema/CarbonAlterTableRenameCommand.scala | 6 ++-
10 files changed, 118 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 7caad43..0457e85 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -16,12 +16,16 @@
*/
package org.apache.carbondata.events
+import java.util
+
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.CompactionType
/**
*
@@ -144,37 +148,53 @@ case class AlterTableRenameAbortEvent(
sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
-case class AlterTableCompactionPreEvent(
+/**
+ * Event for handling pre compaction operations, lister has to implement this event on pre execution
+ *
+ * @param sparkSession
+ * @param carbonTable
+ */
+case class AlterTableCompactionPreEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
carbonMergerMapping: CarbonMergerMapping,
- mergedLoadName: String,
- sqlContext: SQLContext) extends Event with AlterTableCompactionEventInfo
-
+ mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
/**
- *
+ * Compaction Event for handling pre update status file opeartions, lister has to implement this
+ * event before updating the table status file
+ * @param sparkSession
* @param carbonTable
* @param carbonMergerMapping
* @param mergedLoadName
- * @param sQLContext
*/
-case class AlterTableCompactionPostEvent(
+case class AlterTableCompactionPostEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
carbonMergerMapping: CarbonMergerMapping,
- mergedLoadName: String,
- sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
-
+ mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
+/**
+ * Compaction Event for handling pre update status file opeartions, lister has to implement this
+ * event before updating the table status file
+ * @param sparkSession
+ * @param carbonTable
+ * @param carbonMergerMapping
+ * @param carbonLoadModel
+ * @param mergedLoadName
+ */
+case class AlterTableCompactionPreStatusUpdateEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ carbonMergerMapping: CarbonMergerMapping,
+ carbonLoadModel: CarbonLoadModel,
+ mergedLoadName: String) extends Event with AlterTableCompactionStatusUpdateEventInfo
/**
- * Class for handling clean up in case of any failure and abort the operation
+ * Compaction Event for handling clean up in case of any compaction failure and abort the
+ * operation, lister has to implement this event to handle failure scenarios
*
* @param carbonTable
* @param carbonMergerMapping
* @param mergedLoadName
- * @param sQLContext
*/
-case class AlterTableCompactionAbortEvent(
+case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
carbonMergerMapping: CarbonMergerMapping,
- mergedLoadName: String,
- sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
+ mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 4af337b..8e69855 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -93,13 +93,21 @@ trait AlterTableAddColumnEventInfo {
/**
* event for alter_table_rename
*/
-trait AlterTableCompactionEventInfo {
+trait AlterTableCompactionStatusUpdateEventInfo {
val carbonTable: CarbonTable
val carbonMergerMapping: CarbonMergerMapping
val mergedLoadName: String
}
/**
+ * event for alter_table_compaction
+ */
+trait AlterTableCompactionEventInfo {
+ val sparkSession: SparkSession
+ val carbonTable: CarbonTable
+}
+
+/**
* event for DeleteSegmentById
*/
trait DeleteSegmentbyIdEventInfo {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
index 84dde84..022ad72 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -45,8 +45,8 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
/**
- * Class for handling operations after data load completion and before final commit of load
- * operation. Example usage: For loading pre-aggregate tables
+ * Event for handling operations after data load completion and before final
+ * commit of load operation. Example usage: For loading pre-aggregate tables
*/
case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession,
carbonTableIdentifier: CarbonTableIdentifier,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1d2934f..8f4af1b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -81,7 +81,8 @@ object CarbonDataRDDFactory {
storeLocation: String,
compactionType: CompactionType,
carbonTable: CarbonTable,
- compactionModel: CompactionModel): Unit = {
+ compactionModel: CompactionModel,
+ operationContext: OperationContext): Unit = {
// taking system level lock at the mdt file location
var configuredMdtPath = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_UPDATE_SYNC_FOLDER,
@@ -114,7 +115,8 @@ object CarbonDataRDDFactory {
carbonLoadModel,
storeLocation,
compactionModel,
- lock
+ lock,
+ operationContext
)
} catch {
case e: Exception =>
@@ -150,7 +152,8 @@ object CarbonDataRDDFactory {
carbonLoadModel: CarbonLoadModel,
storeLocation: String,
compactionModel: CompactionModel,
- compactionLock: ICarbonLock): Unit = {
+ compactionLock: ICarbonLock,
+ operationContext: OperationContext): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
@@ -280,14 +283,15 @@ object CarbonDataRDDFactory {
def loadCarbonData(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- storePath: String,
columnar: Boolean,
partitionStatus: SegmentStatus = SegmentStatus.SUCCESS,
result: Option[DictionaryServer],
overwriteTable: Boolean,
hadoopConf: Configuration,
dataFrame: Option[DataFrame] = None,
- updateModel: Option[UpdateTableModel] = None): Unit = {
+ updateModel: Option[UpdateTableModel] = None,
+ operationContext: OperationContext): Unit = {
+ val storePath: String = carbonLoadModel.getTablePath
LOGGER.audit(s"Data load request has been received for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Check if any load need to be deleted before loading new data
@@ -494,10 +498,12 @@ object CarbonDataRDDFactory {
throw new Exception("No Data to load")
}
writeDictionary(carbonLoadModel, result, writeAll = false)
- val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession,
+ val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+ LoadTablePreStatusUpdateEvent(
+ sqlContext.sparkSession,
carbonTable.getCarbonTableIdentifier,
carbonLoadModel)
- OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent)
+ OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
@@ -518,7 +524,7 @@ object CarbonDataRDDFactory {
}
try {
// compaction handling
- handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable)
+ handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable, operationContext)
} catch {
case e: Exception =>
throw new Exception(
@@ -682,7 +688,8 @@ object CarbonDataRDDFactory {
private def handleSegmentMerging(
sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
- carbonTable: CarbonTable
+ carbonTable: CarbonTable,
+ operationContext: OperationContext
): Unit = {
LOGGER.info(s"compaction need status is" +
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
@@ -717,7 +724,8 @@ object CarbonDataRDDFactory {
storeLocation,
CompactionType.MINOR,
carbonTable,
- compactionModel
+ compactionModel,
+ operationContext
)
} else {
val lock = CarbonLockFactory.getCarbonLockObj(
@@ -731,7 +739,8 @@ object CarbonDataRDDFactory {
carbonLoadModel,
storeLocation,
compactionModel,
- lock
+ lock,
+ operationContext
)
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 3ebc957..5f5a3d1 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
-import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.MergeResultImpl
@@ -147,7 +147,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
// trigger event for compaction
val operationContext = new OperationContext
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
- AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
+ AlterTableCompactionPreEvent(sqlContext.sparkSession,
+ carbonTable,
+ carbonMergerMapping,
+ mergedLoadName)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
var execInstance = "1"
@@ -195,9 +198,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
// trigger event for compaction
- val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
- AlterTableCompactionPostEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
- OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+ val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
+ AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
+ carbonTable,
+ carbonMergerMapping,
+ carbonLoadModel,
+ mergedLoadName)
+ OperationListenerBus.getInstance
+ .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
val endTime = System.nanoTime()
LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index a9b5455..e6ee535 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -271,7 +271,7 @@ object CarbonSession {
.addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
.addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
.addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
- .addListener(classOf[AlterTableCompactionPostEvent],
+ .addListener(classOf[AlterTableCompactionPreStatusUpdateEvent],
AlterPreAggregateTableCompactionPostListener)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 462b055..5fdf62a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
@@ -83,12 +84,18 @@ case class CarbonAlterTableCompactionCommand(
CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir"))
storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+ // trigger event for compaction
+ val operationContext = new OperationContext
+ val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+ AlterTableCompactionPreEvent(sparkSession, table, null, null)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
try {
alterTableForCompaction(
sparkSession.sqlContext,
alterTableModel,
carbonLoadModel,
- storeLocation)
+ storeLocation,
+ operationContext)
} catch {
case e: Exception =>
if (null != e.getMessage) {
@@ -99,13 +106,18 @@ case class CarbonAlterTableCompactionCommand(
"Exception in compaction. Please check logs for more info.")
}
}
+ // trigger event for compaction
+ val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+ AlterTableCompactionPostEvent(sparkSession, table, null, null)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
Seq.empty
}
private def alterTableForCompaction(sqlContext: SQLContext,
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
- storeLocation: String): Unit = {
+ storeLocation: String,
+ operationContext: OperationContext): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
val compactionSize: Long = CarbonDataMergerUtil.getCompactionSize(compactionType)
@@ -167,7 +179,8 @@ case class CarbonAlterTableCompactionCommand(
storeLocation,
compactionType,
carbonTable,
- compactionModel
+ compactionModel,
+ operationContext
)
} else {
// normal flow of compaction
@@ -194,7 +207,8 @@ case class CarbonAlterTableCompactionCommand(
carbonLoadModel,
storeLocation,
compactionModel,
- lock
+ lock,
+ operationContext
)
} catch {
case e: Exception =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f642785..ebdaa33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -39,8 +39,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext}
-import org.apache.carbondata.events.{LoadTablePreExecutionEvent, OperationListenerBus}
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -190,14 +189,16 @@ case class CarbonLoadDataCommand(
carbonLoadModel,
columnar,
partitionStatus,
- hadoopConf)
+ hadoopConf,
+ operationContext)
} else {
loadData(
sparkSession,
carbonLoadModel,
columnar,
partitionStatus,
- hadoopConf)
+ hadoopConf,
+ operationContext)
}
val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
new LoadTablePostExecutionEvent(sparkSession,
@@ -253,7 +254,8 @@ case class CarbonLoadDataCommand(
carbonLoadModel: CarbonLoadModel,
columnar: Boolean,
partitionStatus: SegmentStatus,
- hadoopConf: Configuration): Unit = {
+ hadoopConf: Configuration,
+ operationContext: OperationContext): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
.getCarbonTableIdentifier
@@ -314,14 +316,14 @@ case class CarbonLoadDataCommand(
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getTablePath,
columnar,
partitionStatus,
server,
isOverwriteTable,
hadoopConf,
dataFrame,
- updateModel)
+ updateModel,
+ operationContext)
}
private def loadData(
@@ -329,7 +331,8 @@ case class CarbonLoadDataCommand(
carbonLoadModel: CarbonLoadModel,
columnar: Boolean,
partitionStatus: SegmentStatus,
- hadoopConf: Configuration): Unit = {
+ hadoopConf: Configuration,
+ operationContext: OperationContext): Unit = {
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val fields = dataFrame.get.schema.fields
import org.apache.spark.sql.functions.udf
@@ -365,14 +368,14 @@ case class CarbonLoadDataCommand(
}
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
- carbonLoadModel.getTablePath,
columnar,
partitionStatus,
None,
isOverwriteTable,
hadoopConf,
loadDataFrame,
- updateModel)
+ updateModel,
+ operationContext)
}
private def updateTableMetadata(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 9168247..4315e05 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -70,10 +70,10 @@ object AlterPreAggregateTableCompactionPostListener extends OperationEventListen
* @param operationContext
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
- val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
+ val compactionEvent = event.asInstanceOf[AlterTableCompactionPreStatusUpdateEvent]
val carbonTable = compactionEvent.carbonTable
val compactionType = compactionEvent.carbonMergerMapping.campactionType
- val sparkSession = compactionEvent.sQLContext.sparkSession
+ val sparkSession = compactionEvent.sparkSession
if (carbonTable.hasDataMapSchema) {
carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
val childRelationIdentifier = dataMapSchema.getRelationIdentifier
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0da0a4f6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 6bf55db..1766064 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql.execution.command.schema
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
@@ -170,12 +172,14 @@ private[sql] case class CarbonAlterTableRenameCommand(
AlterTableUtil.releaseLocks(locks)
// case specific to rename table as after table rename old table path will not be found
if (carbonTable != null) {
+ val newTablePath = CarbonUtil
+ .getNewTablePath(new Path(carbonTable.getTablePath), newTableName)
AlterTableUtil
.releaseLocksManually(locks,
locksToBeAcquired,
oldDatabaseName,
newTableName,
- carbonTable.getTablePath)
+ newTablePath)
}
}
Seq.empty