You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/29 05:49:07 UTC
carbondata git commit: [CARBONDATA-1592] Added event listeners
Repository: carbondata
Updated Branches:
refs/heads/master 7e124f4f1 -> 9e0fd5ffe
[CARBONDATA-1592] Added event listeners
Description : Added Event Listener interface to Carbondata. This will allow extending the current functionality of various commands to perform various other operations.
This closes #1562
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9e0fd5ff
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9e0fd5ff
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9e0fd5ff
Branch: refs/heads/master
Commit: 9e0fd5ffe3dfad001a95ff592644a34c448d4843
Parents: 7e124f4
Author: Manohar <ma...@gmail.com>
Authored: Fri Nov 24 15:11:59 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 29 13:48:57 2017 +0800
----------------------------------------------------------------------
.../apache/carbondata/core/util/CarbonUtil.java | 6 ---
.../carbondata/events/AlterTableEvents.scala | 27 ++++++++++--
.../events/CreateDatabaseEvents.scala | 29 +++++++++++++
.../carbondata/events/CreateTableEvents.scala | 43 ++++++++++++++++++++
.../apache/carbondata/events/IUDEvents.scala | 12 ++++--
.../apache/carbondata/events/LoadEvents.scala | 6 ++-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 4 +-
.../apache/carbondata/spark/rdd/Compactor.scala | 12 +++++-
.../scala/org/apache/spark/util/FileUtils.scala | 13 ++++++
.../spark/rdd/CarbonDataRDDFactory.scala | 7 ----
.../spark/sql/CarbonDictionaryDecoder.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 7 +++-
.../execution/BatchedDataSourceScanExec.scala | 4 +-
.../command/CarbonCreateTableCommand.scala | 13 +++++-
.../command/management/CleanFilesCommand.scala | 10 +++++
.../command/management/LoadTableCommand.scala | 15 +++++++
.../mutation/ProjectForDeleteCommand.scala | 4 +-
.../mutation/ProjectForUpdateCommand.scala | 5 +--
.../CarbonAlterTableAddColumnCommand.scala | 11 +++--
.../CarbonAlterTableDataTypeChangeCommand.scala | 14 +++++--
.../strategy/CarbonLateDecodeStrategy.scala | 2 +-
.../sql/execution/strategy/DDLStrategy.scala | 3 ++
22 files changed, 204 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 02fe054..1175cf0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2026,12 +2026,6 @@ public final class CarbonUtil {
}
}
- public static void createDatabaseDirectory(String dbName, String storePath) throws IOException {
- String databasePath = storePath + File.separator + dbName.toLowerCase();
- FileFactory.FileType fileType = FileFactory.getFileType(databasePath);
- FileFactory.mkdirs(databasePath, fileType);
- }
-
public static void dropDatabaseDirectory(String dbName, String storePath)
throws IOException, InterruptedException {
String databasePath = storePath + File.separator + dbName;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 5a818fc..1a0c305 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -40,11 +40,21 @@ case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
* @param carbonTable
* @param alterTableDataTypeChangeModel
*/
-case class AlterTableDataTypeChangePreEvent(carbonTable: CarbonTable,
+case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
extends Event with AlterTableDataTypeChangeEventInfo
/**
+ * Class for handling clean up in case of any failure and abort the operation
+ *
+ * @param carbonTable
+ * @param alterTableDataTypeChangeModel
+ */
+case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+ alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
+ extends Event with AlterTableDataTypeChangeEventInfo
+
+/**
*
* @param carbonTable
* @param alterTableDropColumnModel
@@ -82,7 +92,16 @@ case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
* @param carbonTable
* @param alterTableAddColumnsModel
*/
-case class AlterTableAddColumnPreEvent(carbonTable: CarbonTable,
+case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+ alterTableAddColumnsModel: AlterTableAddColumnsModel)
+ extends Event with AlterTableAddColumnEventInfo
+
+/**
+ *
+ * @param carbonTable
+ * @param alterTableAddColumnsModel
+ */
+case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
alterTableAddColumnsModel: AlterTableAddColumnsModel)
extends Event with AlterTableAddColumnEventInfo
@@ -118,7 +137,7 @@ case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
* @param mergedLoadName
* @param sQLContext
*/
-case class AlterTableCompactionPreEvent(carbonTable: CarbonTable,
+case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
carbonLoadModel: CarbonLoadModel,
mergedLoadName: String,
sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
@@ -131,7 +150,7 @@ case class AlterTableCompactionPreEvent(carbonTable: CarbonTable,
* @param mergedLoadName
* @param sQLContext
*/
-case class AlterTableCompactionPostEvent(carbonTable: CarbonTable,
+case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
carbonLoadModel: CarbonLoadModel,
mergedLoadName: String,
sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
new file mode 100644
index 0000000..dae22b1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateDatabaseEvents.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+
+case class CreateDatabasePreExecutionEvent(databaseName: String) extends Event
+ with DatabaseEventInfo
+
+case class CreateDatabasePostExecutionEvent(databaseName: String, dataBasePath: String)
+ extends Event with DatabaseEventInfo
+
+case class CreateDatabaseAbortExecutionEvent(databaseName: String)
+ extends Event with DatabaseEventInfo
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
new file mode 100644
index 0000000..3b03e4d
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.events
+
+import org.apache.spark.sql._
+
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+
+/**
+ * Class for handling operations before start of a load process.
+ * Example usage: For validation purpose
+ */
+case class CreateTablePreExecutionEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier,
+ storePath: String) extends Event with TableEventInfo
+
+/**
+ * Class for handling operations after data load completion and before final
+ * commit of load operation. Example usage: For loading pre-aggregate tables
+ */
+case class CreateTablePostExecutionEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier) extends Event with TableEventInfo
+
+/**
+ * Class for handling clean up in case of any failure and abort the operation.
+ */
+case class CreateTableAbortExecutionEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier) extends Event with TableEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
index deebb65..1bd94d4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/IUDEvents.scala
@@ -16,20 +16,24 @@
*/
package org.apache.carbondata.events
+import org.apache.spark.sql.SparkSession
+
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
/**
*
* @param carbonTable
*/
-case class UpdateTablePreEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+case class UpdateTablePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
+ extends Event with UpdateTableEventInfo
/**
*
* @param carbonTable
*/
-case class UpdateTablePostEvent(carbonTable: CarbonTable) extends Event with UpdateTableEventInfo
+case class UpdateTablePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
+ extends Event with UpdateTableEventInfo
/**
@@ -42,7 +46,7 @@ case class UpdateTableAbortEvent(carbonTable: CarbonTable) extends Event with Up
*
* @param carbonTable
*/
-case class DeleteFromTablePreEvent(carbonTable: CarbonTable)
+case class DeleteFromTablePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
extends Event with DeleteFromTableEventInfo
@@ -50,7 +54,7 @@ case class DeleteFromTablePreEvent(carbonTable: CarbonTable)
*
* @param carbonTable
*/
-case class DeleteFromTablePostEvent(carbonTable: CarbonTable)
+case class DeleteFromTablePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable)
extends Event with DeleteFromTableEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
index e3833d8..d87a1af 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -28,7 +28,11 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
*/
case class LoadTablePreExecutionEvent(sparkSession: SparkSession,
carbonTableIdentifier: CarbonTableIdentifier,
- carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
+ carbonLoadModel: CarbonLoadModel,
+ factPath: String,
+ isDataFrameDefined: Boolean,
+ optionsFinal: scala.collection
+ .mutable.Map[String, String]) extends Event with LoadEventInfo
/**
* Class for handling operations after data load completion and before final
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 7eaf95a..171b71b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -56,8 +56,8 @@ import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
*/
class CarbonScanRDD(
@transient sc: SparkContext,
- columnProjection: CarbonProjection,
- filterExpression: Expression,
+ val columnProjection: CarbonProjection,
+ var filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@transient serializedTableInfo: Array[Byte],
@transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index ba634b7..e41211a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -69,7 +69,11 @@ object Compactor {
// trigger event for compaction
val operationContext = new OperationContext
val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
- AlterTableCompactionPreEvent(carbonTable, carbonLoadModel, mergedLoadName, sc)
+ AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession,
+ carbonTable,
+ carbonLoadModel,
+ mergedLoadName,
+ sc)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
var execInstance = "1"
@@ -118,7 +122,11 @@ object Compactor {
// trigger event for compaction
val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
- AlterTableCompactionPostEvent(carbonTable, carbonLoadModel, mergedLoadName, sc)
+ AlterTableCompactionPostEvent(compactionCallableModel.sqlContext.sparkSession,
+ carbonTable,
+ carbonLoadModel,
+ mergedLoadName,
+ sc)
OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
val endTime = System.nanoTime()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
index 681910d..b60d030 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark.util
+import java.io.{File, IOException}
+
import org.apache.hadoop.conf.Configuration
import org.apache.carbondata.common.logging.LogServiceFactory
@@ -24,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.CarbonFile
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.events.{CreateDatabasePostExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.exception.DataLoadingException
object FileUtils {
@@ -102,4 +105,14 @@ object FileUtils {
}
}
+ def createDatabaseDirectory(dbName: String, storePath: String) {
+ val databasePath: String = storePath + File.separator + dbName.toLowerCase
+ val fileType = FileFactory.getFileType(databasePath)
+ FileFactory.mkdirs(databasePath, fileType)
+ val operationContext = new OperationContext
+ val createDatabasePostExecutionEvent = new CreateDatabasePostExecutionEvent(dbName,
+ databasePath)
+ OperationListenerBus.getInstance.fireEvent(createDatabasePostExecutionEvent, operationContext)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 351d765..145068f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -496,13 +496,6 @@ object CarbonDataRDDFactory {
throw new Exception("No Data to load")
}
writeDictionary(carbonLoadModel, result, writeAll = false)
- // Register a handler here for executing tasks required before committing
- // the load operation to a table status file
- val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
- LoadTablePostExecutionEvent(sqlContext.sparkSession,
- carbonTable.getCarbonTableIdentifier,
- carbonLoadModel)
- OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 9d88c4c..1a336c4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -461,7 +461,7 @@ class CarbonDecoderRDD(
relations: Seq[CarbonDecoderRelation],
profile: CarbonProfile,
aliasMap: CarbonAliasDecoderRelation,
- prev: RDD[InternalRow],
+ val prev: RDD[InternalRow],
output: Seq[Attribute],
serializedTableInfo: Array[Byte])
extends CarbonRDDWithTableInfo[InternalRow](prev, serializedTableInfo) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index acef2e1..592e2c9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -26,11 +26,13 @@ import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, Carbo
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonSessionInfo, SessionParams, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util._
import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationListenerBus}
+import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.rdd.SparkReadSupport
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+
/**
* Carbon Environment for unified context
*/
@@ -80,9 +82,10 @@ class CarbonEnv {
}
LOGGER.info(s"carbon env initial: $storePath")
// trigger event for CarbonEnv init
+ val operationContext = new OperationContext
val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
CarbonEnvInitPreEvent(sparkSession, storePath)
- OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent)
+ OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
index f08c111..3c3efaa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/BatchedDataSourceScanExec.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
+import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.DataType
@@ -35,7 +36,8 @@ case class BatchedDataSourceScanExec(
@transient relation: BaseRelation,
override val outputPartitioning: Partitioning,
override val metadata: Map[String, String],
- override val metastoreTableIdentifier: Option[TableIdentifier])
+ override val metastoreTableIdentifier: Option[TableIdentifier],
+ @transient logicalRelation: LogicalRelation)
extends DataSourceScanExec with CodegenSupport {
override lazy val metrics =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
index 06f3645..f9b8b9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/CarbonCreateTableCommand.scala
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.exception.InvalidConfigurationException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
case class CarbonCreateTableCommand(
cm: TableModel,
@@ -78,6 +79,12 @@ case class CarbonCreateTableCommand(
}
} else {
val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName)
+ val operationContext = new OperationContext
+ val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
+ new CreateTablePreExecutionEvent(sparkSession,
+ tableIdentifier.getCarbonTableIdentifier,
+ tablePath)
+ OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
// Add Database to catalog and persist
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
@@ -93,7 +100,7 @@ case class CarbonCreateTableCommand(
|(${ fields.map(f => f.rawSchema).mkString(",") })
|USING org.apache.spark.sql.CarbonSource""".stripMargin +
s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
+ s""""$tablePath", path "$tablePath" $carbonSchemaString) """.stripMargin)
} catch {
case e: AnalysisException => throw e
case e: Exception =>
@@ -107,7 +114,9 @@ case class CarbonCreateTableCommand(
CarbonException.analysisException(msg)
}
}
-
+ val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
+ new CreateTablePostExecutionEvent(sparkSession, tableIdentifier.getCarbonTableIdentifier)
+ OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
}
Seq.empty
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
index 7625579..d6604d8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CleanFilesCommand.scala
@@ -45,6 +45,12 @@ case class CleanFilesCommand(
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
+ val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
+ val operationContext = new OperationContext
+ val cleanFilesPreEvent: CleanFilesPreEvent =
+ CleanFilesPreEvent(carbonTable,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent, operationContext)
if (tableName.isDefined) {
Checker.validateTableExists(databaseNameOp, tableName.get, sparkSession)
if (forceTableClean) {
@@ -55,6 +61,10 @@ case class CleanFilesCommand(
} else {
cleanGarbageDataInAllTables(sparkSession)
}
+ val cleanFilesPostEvent: CleanFilesPostEvent =
+ CleanFilesPostEvent(carbonTable,
+ sparkSession)
+ OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index d805227..0025011 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.statusmanager.SegmentStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -139,6 +140,15 @@ case class LoadTableCommand(
carbonLoadModel,
hadoopConf
)
+ val operationContext = new OperationContext
+ val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+ new LoadTablePreExecutionEvent(sparkSession,
+ null,
+ carbonLoadModel,
+ factPath,
+ dataFrame.isDefined,
+ optionsFinal)
+ OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
try{
// First system has to partition the data first and then call the load data
@@ -187,6 +197,11 @@ case class LoadTableCommand(
partitionStatus,
hadoopConf)
}
+ val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+ new LoadTablePostExecutionEvent(sparkSession,
+ table.getCarbonTableIdentifier,
+ carbonLoadModel)
+ OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
} catch {
case CausedBy(ex: NoRetryException) =>
LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
index cf5bfd8..2b9caf7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForDeleteCommand.scala
@@ -56,7 +56,7 @@ private[sql] case class ProjectForDeleteCommand(
// trigger event for Delete from table
val operationContext = new OperationContext
val deleteFromTablePreEvent: DeleteFromTablePreEvent =
- DeleteFromTablePreEvent(carbonTable)
+ DeleteFromTablePreEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(deleteFromTablePreEvent, operationContext)
val metadataLock = CarbonLockFactory
@@ -85,7 +85,7 @@ private[sql] case class ProjectForDeleteCommand(
// trigger post event for Delete from table
val deleteFromTablePostEvent: DeleteFromTablePostEvent =
- DeleteFromTablePostEvent(carbonTable)
+ DeleteFromTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(deleteFromTablePostEvent, operationContext)
}
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
index da62f27..341f368 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/ProjectForUpdateCommand.scala
@@ -62,9 +62,8 @@ private[sql] case class ProjectForUpdateCommand(
// trigger event for Update table
val operationContext = new OperationContext
val updateTablePreEvent: UpdateTablePreEvent =
- UpdateTablePreEvent(carbonTable)
+ UpdateTablePreEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(updateTablePreEvent, operationContext)
-
val metadataLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
LockUsage.METADATA_LOCK)
@@ -117,7 +116,7 @@ private[sql] case class ProjectForUpdateCommand(
// trigger event for Update table
val updateTablePostEvent: UpdateTablePostEvent =
- UpdateTablePostEvent(carbonTable)
+ UpdateTablePostEvent(sparkSession, carbonTable)
OperationListenerBus.getInstance.fireEvent(updateTablePostEvent, operationContext)
} catch {
case e: HorizontalCompactionException =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index ed82140..0a720da 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{AlterTableAddColumnPreEvent, OperationListenerBus}
+import org.apache.carbondata.events.{AlterTableAddColumnPostEvent, AlterTableAddColumnPreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.rdd.{AlterTableAddColumnRDD, AlterTableDropColumnRDD}
@@ -59,9 +59,10 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
// up relation should be called after acquiring the lock
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val alterTableAddColumnListener = AlterTableAddColumnPreEvent(carbonTable,
+ val operationContext = new OperationContext
+ val alterTableAddColumnListener = AlterTableAddColumnPreEvent(sparkSession, carbonTable,
alterTableAddColumnsModel)
- OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener)
+ OperationListenerBus.getInstance().fireEvent(alterTableAddColumnListener, operationContext)
// get the latest carbon table and check for column existence
// read the latest schema file
val carbonTablePath = CarbonStorePath
@@ -93,6 +94,10 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
.updateSchemaInfo(carbonTable,
schemaConverter.fromWrapperToExternalSchemaEvolutionEntry(schemaEvolutionEntry),
thriftTable)(sparkSession)
+ val alterTablePostExecutionEvent: AlterTableAddColumnPostEvent =
+ new AlterTableAddColumnPostEvent(sparkSession,
+ carbonTable, alterTableAddColumnsModel)
+ OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for add columns is successful for table $dbName.$tableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index f63cf0b..6e2c83d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -28,7 +28,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{AlterTableAddColumnPreEvent, AlterTableDataTypeChangePreEvent, OperationListenerBus}
+import org.apache.carbondata.events.{AlterTableDataTypeChangePostEvent, AlterTableDataTypeChangePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.{ColumnSchema, SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataTypeConverterUtil}
@@ -52,9 +52,11 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
.validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
- val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(carbonTable,
- alterTableDataTypeChangeModel)
- OperationListenerBus.getInstance().fireEvent(alterTableDataTypeChangeListener)
+ val operationContext = new OperationContext
+ val alterTableDataTypeChangeListener = AlterTableDataTypeChangePreEvent(sparkSession,
+ carbonTable, alterTableDataTypeChangeModel)
+ OperationListenerBus.getInstance()
+ .fireEvent(alterTableDataTypeChangeListener, operationContext)
val columnName = alterTableDataTypeChangeModel.columnName
val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -96,6 +98,10 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
.setTime_stamp(System.currentTimeMillis)
AlterTableUtil.updateSchemaInfo(carbonTable, schemaEvolutionEntry, tableInfo)(sparkSession)
+ val alterTablePostExecutionEvent: AlterTableDataTypeChangePostEvent =
+ new AlterTableDataTypeChangePostEvent(sparkSession, carbonTable,
+ alterTableDataTypeChangeModel)
+ OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
} catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index aecddcb..2c23c57 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -329,7 +329,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
relation.relation,
getPartitioning(table.carbonTable, updateRequestedColumns),
metadata,
- relation.catalogTable.map(_.identifier))
+ relation.catalogTable.map(_.identifier), relation)
} else {
RowDataSourceScanExec(output,
scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9e0fd5ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 6ea743d..bee762a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -27,7 +27,9 @@ import org.apache.spark.sql.execution.command.partition.ShowCarbonPartitionsComm
import org.apache.spark.sql.execution.command.schema._
import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
+import org.apache.spark.util.FileUtils
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -78,6 +80,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
_, child: LogicalPlan, overwrite, _) =>
ExecutedCommandExec(LoadTableByInsertCommand(relation, child, overwrite)) :: Nil
case createDb@CreateDatabaseCommand(dbName, ifNotExists, _, _, _) =>
+ FileUtils.createDatabaseDirectory(dbName, CarbonProperties.getStorePath)
ExecutedCommandExec(createDb) :: Nil
case drop@DropDatabaseCommand(dbName, ifExists, isCascade) =>
ExecutedCommandExec(CarbonDropDatabaseCommand(drop)) :: Nil