You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/11/09 00:50:13 UTC
[5/6] carbondata git commit: [CARBONDATA-3064] Support separate audit
log
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index c8c9a47..35b73d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -39,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener}
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-import org.apache.carbondata.spark.util.CommonUtil
class MergeIndexEventListener extends OperationEventListener with Logging {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -47,7 +45,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
event match {
case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
- Audit.log(LOGGER, "Load post status event-listener called for merge index")
+ LOGGER.info("Load post status event-listener called for merge index")
val loadModel = preStatusUpdateEvent.getCarbonLoadModel
val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
val compactedSegments = loadModel.getMergedSegmentIds
@@ -73,7 +71,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
}
}
case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
- Audit.log(LOGGER, "Merge index for compaction called")
+ LOGGER.info("Merge index for compaction called")
val carbonTable = alterTableCompactionPostEvent.carbonTable
val mergedLoads = alterTableCompactionPostEvent.compactedLoads
val sparkSession = alterTableCompactionPostEvent.sparkSession
@@ -84,8 +82,6 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
val carbonMainTable = alterTableMergeIndexEvent.carbonTable
val sparkSession = alterTableMergeIndexEvent.sparkSession
if (!carbonMainTable.isStreamingSink) {
- Audit.log(LOGGER, s"Compaction request received for table " +
- s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
LOGGER.info(s"Merge Index request received for table " +
s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
val lock = CarbonLockFactory.getCarbonLockObj(
@@ -130,16 +126,11 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
clearBlockDataMapCache(carbonMainTable, validSegmentIds)
val requestMessage = "Compaction request completed for table " +
s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
- Audit.log(LOGGER, requestMessage)
LOGGER.info(requestMessage)
} else {
val lockMessage = "Not able to acquire the compaction lock for table " +
- s"${ carbonMainTable.getDatabaseName }.${
- carbonMainTable
- .getTableName
- }"
-
- Audit.log(LOGGER, lockMessage)
+ s"${ carbonMainTable.getDatabaseName }." +
+ s"${ carbonMainTable.getTableName}"
LOGGER.error(lockMessage)
CarbonException.analysisException(
"Table is already locked for compaction. Please try after some time.")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 081482c..271a19b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,10 +22,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
@@ -63,6 +61,11 @@ case class CarbonCreateDataMapCommand(
case _ => null
}
+ if (mainTable != null) {
+ setAuditTable(mainTable)
+ }
+ setAuditInfo(Map("provider" -> dmProviderName, "dmName" -> dataMapName) ++ dmProperties)
+
if (mainTable != null && !mainTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
@@ -75,7 +78,7 @@ case class CarbonCreateDataMapCommand(
}
}
- if (mainTable !=null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) {
+ if (mainTable != null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) {
throw new MalformedCarbonCommandException(s"Unsupported operation on table with " +
s"V1 or V2 format data")
}
@@ -153,7 +156,6 @@ case class CarbonCreateDataMapCommand(
systemFolderLocation, tableIdentifier, dmProviderName)
OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
operationContext)
- Audit.log(LOGGER, s"DataMap $dataMapName successfully added")
Seq.empty
}
@@ -192,5 +194,7 @@ case class CarbonCreateDataMapCommand(
}
Seq.empty
}
+
+ override protected def opName: String = "CREATE DATAMAP"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
index be62ce4..267fedd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -65,6 +65,8 @@ case class CarbonDataMapRebuildCommand(
)(sparkSession)
}
+ setAuditTable(table)
+
val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession)
provider.rebuild()
@@ -87,4 +89,5 @@ case class CarbonDataMapRebuildCommand(
Seq.empty
}
+ override protected def opName: String = "REBUILD DATAMAP"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index ae33aa8..3cee810 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -58,6 +58,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
tableIdentifier match {
case Some(table) =>
val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession)
+ setAuditTable(carbonTable)
Checker.validateTableExists(table.database, table.table, sparkSession)
if (carbonTable.hasDataMapSchema) {
dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList)
@@ -97,4 +98,6 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
Seq.empty
}
}
+
+ override protected def opName: String = "SHOW DATAMAP"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 67e2dee..38ec07d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -59,6 +58,7 @@ case class CarbonDropDataMapCommand(
var dataMapSchema: DataMapSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ setAuditInfo(Map("dmName" -> dataMapName))
if (table.isDefined) {
val databaseNameOp = table.get.database
val tableName = table.get.table
@@ -78,6 +78,7 @@ case class CarbonDropDataMapCommand(
null
}
}
+ setAuditTable(mainTable)
val tableIdentifier =
AbsoluteTableIdentifier
.from(tablePath,
@@ -112,8 +113,6 @@ case class CarbonDropDataMapCommand(
locksToBeAcquired foreach {
lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
- Audit.log(LOGGER, s"Deleting datamap [$dataMapName] under table [$tableName]")
-
// drop index,mv datamap on the main table.
if (mainTable != null &&
DataMapStoreManager.getInstance().getAllDataMap(mainTable).size() > 0) {
@@ -242,4 +241,5 @@ case class CarbonDropDataMapCommand(
Seq.empty
}
+ override protected def opName: String = "DROP DATAMAP"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 8e338db..1b1d708 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -33,7 +33,6 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -67,6 +66,10 @@ case class CarbonAlterTableCompactionCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = alterTableModel.tableName.toLowerCase
val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
+ setAuditTable(dbName, tableName)
+ if (alterTableModel.customSegmentIds.nonEmpty) {
+ setAuditInfo(Map("segmentIds" -> alterTableModel.customSegmentIds.get.mkString(", ")))
+ }
table = if (tableInfoOp.isDefined) {
CarbonTable.buildFromTableInfo(tableInfoOp.get)
} else {
@@ -217,8 +220,6 @@ case class CarbonAlterTableCompactionCommand(
}
}
- Audit.log(LOGGER, s"Compaction request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (null == carbonLoadModel.getLoadMetadataDetails) {
@@ -314,8 +315,6 @@ case class CarbonAlterTableCompactionCommand(
throw e
}
} else {
- Audit.log(LOGGER, "Not able to acquire the compaction lock for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error(s"Not able to acquire the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
CarbonException.analysisException(
@@ -379,4 +378,8 @@ case class CarbonAlterTableCompactionCommand(
}
}
}
+
+ override protected def opName: String = {
+ s"ALTER TABLE COMPACTION ${alterTableModel.compactionType.toUpperCase}"
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
index ba20773..8df0217 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -33,6 +33,7 @@ case class CarbonAlterTableFinishStreaming(
extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
+ setAuditTable(carbonTable)
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val streamingLock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
@@ -58,4 +59,6 @@ case class CarbonAlterTableFinishStreaming(
}
Seq.empty
}
+
+ override protected def opName: String = "ALTER TABLE FINISH STREAMING"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index a390191..4b5b4b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -57,7 +57,10 @@ case class CarbonCleanFilesCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
-
+ setAuditTable(carbonTable)
+ setAuditInfo(Map(
+ "force" -> forceTableClean.toString,
+ "internal" -> isInternalCleanCall.toString))
if (carbonTable.hasAggregationDataMap) {
cleanFileCommands = carbonTable.getTableInfo.getDataMapSchemaList.asScala.map {
dataMapSchema =>
@@ -150,4 +153,6 @@ case class CarbonCleanFilesCommand(
.error("Failed to clean in progress segments", e)
}
}
+
+ override protected def opName: String = "CLEAN FILES"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
index bf5adc3..d1a54d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
@@ -47,6 +47,8 @@ case class CarbonCliCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+ setAuditTable(carbonTable)
+ setAuditInfo(Map("options" -> commandOptions))
val commandArgs: Seq[String] = commandOptions.split("\\s+")
val finalCommands = commandArgs.collect {
case a if a.trim.equalsIgnoreCase("summary") || a.trim.equalsIgnoreCase("benchmark") =>
@@ -59,4 +61,6 @@ case class CarbonCliCommand(
Row(x)
)
}
+
+ override protected def opName: String = "CLI"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 165a032..275a0fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -35,7 +35,8 @@ case class CarbonDeleteLoadByIdCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
+ setAuditTable(carbonTable)
+ setAuditInfo(Map("segmentIds" -> loadIds.mkString(", ")))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
@@ -66,4 +67,6 @@ case class CarbonDeleteLoadByIdCommand(
OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, operationContext)
Seq.empty
}
+
+ override protected def opName: String = "DELETE SEGMENT BY ID"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 19f5100..db1b7b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -36,7 +36,8 @@ case class CarbonDeleteLoadByLoadDateCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
+ setAuditTable(carbonTable)
+ setAuditInfo(Map("date" -> dateField))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
@@ -66,4 +67,6 @@ case class CarbonDeleteLoadByLoadDateCommand(
Seq.empty
}
+
+ override protected def opName: String = "DELETE SEGMENT BY DATE"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index ee0f5ab..2a64b19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -36,6 +36,7 @@ case class CarbonInsertIntoCommand(
var loadCommand: CarbonLoadDataCommand = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def containsLimit(plan: LogicalPlan): Boolean = {
plan find {
@@ -82,9 +83,19 @@ case class CarbonInsertIntoCommand(
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (null != loadCommand) {
- loadCommand.processData(sparkSession)
+ val rows = loadCommand.processData(sparkSession)
+ setAuditInfo(loadCommand.auditInfo)
+ rows
} else {
Seq.empty
}
}
+
+ override protected def opName: String = {
+ if (overwrite) {
+ "INSERT OVERWRITE"
+ } else {
+ "INSERT INTO"
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 5fbe82e..bba8af7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -48,9 +48,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.Strings
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -65,12 +64,11 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util._
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -113,6 +111,7 @@ case class CarbonLoadDataCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+ setAuditTable(dbName, tableName)
table = if (tableInfoOp.isDefined) {
CarbonTable.buildFromTableInfo(tableInfoOp.get)
} else {
@@ -123,7 +122,6 @@ case class CarbonLoadDataCommand(
}
if (null == relation.carbonTable) {
LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
- Audit.log(LOGGER, s"Data loading failed. table not found: $dbName.$tableName")
throw new NoSuchTableException(dbName, tableName)
}
relation.carbonTable
@@ -189,13 +187,12 @@ case class CarbonLoadDataCommand(
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val hadoopConf = sparkSession.sessionState.newHadoopConf()
val carbonLoadModel = new CarbonLoadModel()
- try {
- val tableProperties = table.getTableInfo.getFactTable.getTableProperties
- val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
- optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
- carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
- carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
- CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+ val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+ optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
+ carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+ carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+ CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
optionsFinal
.put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
@@ -216,176 +213,158 @@ case class CarbonLoadDataCommand(
CompressorFactory.getInstance().getCompressor.getName)
carbonLoadModel.setColumnCompressor(columnCompressor)
- val javaPartition = mutable.Map[String, String]()
- partition.foreach { case (k, v) =>
- if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+ val javaPartition = mutable.Map[String, String]()
+ partition.foreach { case (k, v) =>
+ if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+ }
+
+ new CarbonLoadModelBuilder(table).build(
+ options.asJava,
+ optionsFinal,
+ carbonLoadModel,
+ hadoopConf,
+ javaPartition.asJava,
+ dataFrame.isDefined)
+ // Delete stale segment folders that are not in table status but are physically present in
+ // the Fact folder
+ LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+ TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+ var isUpdateTableStatusRequired = false
+ // if the table is child then extract the uuid from the operation context and the parent would
+ // already generated UUID.
+ // if parent table then generate a new UUID else use empty.
+ val uuid = if (table.isChildDataMap) {
+ Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ } else if (table.hasAggregationDataMap) {
+ UUID.randomUUID().toString
+ } else {
+ ""
+ }
+ try {
+ operationContext.setProperty("uuid", uuid)
+ val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+ new LoadTablePreExecutionEvent(
+ table.getCarbonTableIdentifier,
+ carbonLoadModel)
+ operationContext.setProperty("isOverwrite", isOverwriteTable)
+ OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
+ // Add pre event listener for index datamap
+ val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
+ val dataMapOperationContext = new OperationContext()
+ if (tableDataMaps.size() > 0) {
+ val dataMapNames: mutable.Buffer[String] =
+ tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
+ val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+ new BuildDataMapPreExecutionEvent(sparkSession,
+ table.getAbsoluteTableIdentifier, dataMapNames)
+ OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
+ dataMapOperationContext)
+ }
+ // First system has to partition the data first and then call the load data
+ LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+ concurrentLoadLock = acquireConcurrentLoadLock()
+ // Clean up the old invalid segment data before creating a new entry for new load.
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+ // add the start entry for the new load in the table status file
+ if (updateModel.isEmpty && !table.isHivePartitionTable) {
+ CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+ carbonLoadModel,
+ isOverwriteTable)
+ isUpdateTableStatusRequired = true
+ }
+ if (isOverwriteTable) {
+ LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+ }
+ // if table is an aggregate table then disable single pass.
+ if (carbonLoadModel.isAggLoadRequest) {
+ carbonLoadModel.setUseOnePass(false)
}
- new CarbonLoadModelBuilder(table).build(
- options.asJava,
- optionsFinal,
- carbonLoadModel,
- hadoopConf,
- javaPartition.asJava,
- dataFrame.isDefined)
- // Delete stale segment folders that are not in table status but are physically present in
- // the Fact folder
- LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
- TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
- var isUpdateTableStatusRequired = false
- // if the table is child then extract the uuid from the operation context and the parent would
- // already generated UUID.
- // if parent table then generate a new UUID else use empty.
- val uuid = if (table.isChildDataMap) {
- Option(operationContext.getProperty("uuid")).getOrElse("").toString
- } else if (table.hasAggregationDataMap) {
- UUID.randomUUID().toString
- } else {
- ""
+ // start dictionary server when use one pass load and dimension with DICTIONARY
+ // encoding is present.
+ val allDimensions =
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+ val createDictionary = allDimensions.exists {
+ carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+ !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
}
- try {
- operationContext.setProperty("uuid", uuid)
- val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
- new LoadTablePreExecutionEvent(
- table.getCarbonTableIdentifier,
- carbonLoadModel)
- operationContext.setProperty("isOverwrite", isOverwriteTable)
- OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
- // Add pre event listener for index datamap
- val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
- val dataMapOperationContext = new OperationContext()
- if (tableDataMaps.size() > 0) {
- val dataMapNames: mutable.Buffer[String] =
- tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
- val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
- new BuildDataMapPreExecutionEvent(sparkSession,
- table.getAbsoluteTableIdentifier, dataMapNames)
- OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
- dataMapOperationContext)
- }
- // First system has to partition the data first and then call the load data
- LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
- concurrentLoadLock = acquireConcurrentLoadLock()
- // Clean up the old invalid segment data before creating a new entry for new load.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
- // add the start entry for the new load in the table status file
- if (updateModel.isEmpty && !table.isHivePartitionTable) {
- CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
- carbonLoadModel,
- isOverwriteTable)
- isUpdateTableStatusRequired = true
- }
- if (isOverwriteTable) {
- LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
- }
- // if table is an aggregate table then disable single pass.
- if (carbonLoadModel.isAggLoadRequest) {
- carbonLoadModel.setUseOnePass(false)
+ if (!createDictionary) {
+ carbonLoadModel.setUseOnePass(false)
+ }
+ // Create table and metadata folders if not exist
+ if (carbonLoadModel.isCarbonTransactionalTable) {
+ val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+ val fileType = FileFactory.getFileType(metadataDirectoryPath)
+ if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+ FileFactory.mkdirs(metadataDirectoryPath, fileType)
}
+ } else {
+ carbonLoadModel.setSegmentId(System.currentTimeMillis().toString)
+ }
+ val partitionStatus = SegmentStatus.SUCCESS
+ val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+ if (carbonLoadModel.getUseOnePass) {
+ loadDataUsingOnePass(
+ sparkSession,
+ carbonProperty,
+ carbonLoadModel,
+ columnar,
+ partitionStatus,
+ hadoopConf,
+ operationContext,
+ LOGGER)
+ } else {
+ loadData(
+ sparkSession,
+ carbonLoadModel,
+ columnar,
+ partitionStatus,
+ hadoopConf,
+ operationContext,
+ LOGGER)
+ }
+ val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+ new LoadTablePostExecutionEvent(
+ table.getCarbonTableIdentifier,
+ carbonLoadModel)
+ OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
+ if (tableDataMaps.size() > 0) {
+ val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
+ table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
+ OperationListenerBus.getInstance()
+ .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+ }
- // start dictionary server when use one pass load and dimension with DICTIONARY
- // encoding is present.
- val allDimensions =
- carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
- val createDictionary = allDimensions.exists {
- carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
- !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
- }
- if (!createDictionary) {
- carbonLoadModel.setUseOnePass(false)
- }
- // Create table and metadata folders if not exist
- if (carbonLoadModel.isCarbonTransactionalTable) {
- val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
- val fileType = FileFactory.getFileType(metadataDirectoryPath)
- if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
- FileFactory.mkdirs(metadataDirectoryPath, fileType)
- }
- } else {
- carbonLoadModel.setSegmentId(System.currentTimeMillis().toString)
- }
- val partitionStatus = SegmentStatus.SUCCESS
- val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
- if (carbonLoadModel.getUseOnePass) {
- loadDataUsingOnePass(
- sparkSession,
- carbonProperty,
- carbonLoadModel,
- columnar,
- partitionStatus,
- hadoopConf,
- operationContext,
- LOGGER)
- } else {
- loadData(
- sparkSession,
- carbonLoadModel,
- columnar,
- partitionStatus,
- hadoopConf,
- operationContext,
- LOGGER)
- }
- val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
- new LoadTablePostExecutionEvent(
- table.getCarbonTableIdentifier,
- carbonLoadModel)
- OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
- if (tableDataMaps.size() > 0) {
- val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
- table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
- OperationListenerBus.getInstance()
- .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+ } catch {
+ case CausedBy(ex: NoRetryException) =>
+ // update the load entry in table status file for changing the status to marked for delete
+ if (isUpdateTableStatusRequired) {
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
-
- } catch {
- case CausedBy(ex: NoRetryException) =>
- // update the load entry in table status file for changing the status to marked for delete
- if (isUpdateTableStatusRequired) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
- }
- LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
- throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
- // In case of event related exception
- case preEventEx: PreEventException =>
- LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
- throw new AnalysisException(preEventEx.getMessage)
- case ex: Exception =>
- LOGGER.error(ex)
- // update the load entry in table status file for changing the status to marked for delete
- if (isUpdateTableStatusRequired) {
- CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
- }
- Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. Please check the logs")
- throw ex
- } finally {
- releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
- // Once the data load is successful delete the unwanted partition files
- try {
- val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
- table.getDatabaseName + "/" +
- table.getTableName + "/"
- val fileType = FileFactory.getFileType(partitionLocation)
- if (FileFactory.isFileExist(partitionLocation, fileType)) {
- val file = FileFactory.getCarbonFile(partitionLocation, fileType)
- CarbonUtil.deleteFoldersAndFiles(file)
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(ex)
- Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. " +
- "Problem deleting the partition folder")
- throw ex
+ LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+ throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+ // In case of event related exception
+ case preEventEx: PreEventException =>
+ LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+ throw new AnalysisException(preEventEx.getMessage)
+ case ex: Exception =>
+ LOGGER.error(ex)
+ // update the load entry in table status file for changing the status to marked for delete
+ if (isUpdateTableStatusRequired) {
+ CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
-
+ throw ex
+ } finally {
+ releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
+ // Once the data load is successful delete the unwanted partition files
+ val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
+ table.getDatabaseName + "/" +
+ table.getTableName + "/"
+ val fileType = FileFactory.getFileType(partitionLocation)
+ if (FileFactory.isFileExist(partitionLocation, fileType)) {
+ val file = FileFactory.getCarbonFile(partitionLocation, fileType)
+ CarbonUtil.deleteFoldersAndFiles(file)
}
- } catch {
- case dle: DataLoadingException =>
- Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
- throw dle
- case mce: MalformedCarbonCommandException =>
- Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
- throw mce
}
Seq.empty
}
@@ -543,7 +522,7 @@ case class CarbonLoadDataCommand(
}
}
} else {
- CarbonDataRDDFactory.loadCarbonData(
+ val loadResult = CarbonDataRDDFactory.loadCarbonData(
sparkSession.sqlContext,
carbonLoadModel,
columnar,
@@ -554,10 +533,25 @@ case class CarbonLoadDataCommand(
loadDataFrame,
updateModel,
operationContext)
+ if (loadResult != null) {
+ val info = makeAuditInfo(loadResult)
+ setAuditInfo(info)
+ }
}
rows
}
+ private def makeAuditInfo(loadResult: LoadMetadataDetails): Map[String, String] = {
+ if (loadResult != null) {
+ Map(
+ "SegmentId" -> loadResult.getLoadName,
+ "DataSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getDataSize)),
+ "IndexSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getIndexSize)))
+ } else {
+ Map()
+ }
+ }
+
private def loadData(
sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel,
@@ -593,7 +587,7 @@ case class CarbonLoadDataCommand(
loadDataFrame,
operationContext, LOGGER)
} else {
- CarbonDataRDDFactory.loadCarbonData(
+ val loadResult = CarbonDataRDDFactory.loadCarbonData(
sparkSession.sqlContext,
carbonLoadModel,
columnar,
@@ -604,6 +598,8 @@ case class CarbonLoadDataCommand(
loadDataFrame,
updateModel,
operationContext)
+ val info = makeAuditInfo(loadResult)
+ setAuditInfo(info)
}
rows
}
@@ -1132,4 +1128,11 @@ case class CarbonLoadDataCommand(
(dataFrameWithTupleId)
}
+ override protected def opName: String = {
+ if (isOverwriteTable) {
+ "LOAD DATA OVERWRITE"
+ } else {
+ "LOAD DATA"
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 3f68cc4..4a35e6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -59,6 +59,7 @@ case class CarbonShowLoadsCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+ setAuditTable(carbonTable)
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
@@ -68,4 +69,6 @@ case class CarbonShowLoadsCommand(
showHistory
)
}
+
+ override protected def opName: String = "SHOW SEGMENTS"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 50b88c8..b35c285 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, Me
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -54,6 +53,7 @@ case class RefreshCarbonTableCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+ setAuditTable(databaseName, tableName)
// Steps
// 1. get table path
// 2. perform the below steps
@@ -92,7 +92,6 @@ case class RefreshCarbonTableCommand(
val msg = s"Table registration with Database name [$databaseName] and Table name " +
s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
s" not copied under database [$databaseName]"
- Audit.log(LOGGER, msg)
throwMetadataException(databaseName, tableName, msg)
}
// 2.2.1 Register the aggregate tables to hive
@@ -104,18 +103,7 @@ case class RefreshCarbonTableCommand(
tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
registerAllPartitionsToHive(identifier, sparkSession)
}
- } else {
- Audit.log(LOGGER,
- s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
- s"failed." +
- s"Table [$tableName] either non carbon table or stale carbon table under database " +
- s"[$databaseName]")
}
- } else {
- Audit.log(LOGGER,
- s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
- s"failed." +
- s"Table [$tableName] either already exists or registered under database [$databaseName]")
}
// update the schema modified time
metaStore.updateAndTouchSchemasUpdatedTime()
@@ -185,8 +173,6 @@ case class RefreshCarbonTableCommand(
OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath))
.run(sparkSession)
- Audit.log(LOGGER, s"Table registration with Database name [$dbName] and Table name " +
- s"[$tableName] is successful.")
} catch {
case e: AnalysisException => throw e
case e: Exception =>
@@ -288,4 +274,6 @@ case class RefreshCarbonTableCommand(
AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession)
}
}
+
+ override protected def opName: String = "REFRESH TABLE"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 053937b..70a4350 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -21,14 +21,11 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
-import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
@@ -48,6 +45,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+ setAuditTable(carbonTable)
+ setAuditInfo(Map("plan" -> plan.simpleString))
if (!carbonTable.getTableInfo.isTransactionalTable) {
throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
}
@@ -81,8 +80,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
var lockStatus = false
try {
lockStatus = metadataLock.lockWithRetries()
- Audit.log(LOGGER, s" Delete data request has been received " +
- s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
if (lockStatus) {
LOGGER.info("Successfully able to get the table metadata file lock")
} else {
@@ -140,4 +137,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
}
Seq.empty
}
+
+ override protected def opName: String = "DELETE DATA"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 31e1779..0f23081 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -60,6 +60,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
return Seq.empty
}
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+ setAuditTable(carbonTable)
+ setAuditInfo(Map("plan" -> plan.simpleString))
columns.foreach { col =>
val dataType = carbonTable.getColumnByName(tableName, col).getColumnSchema.getDataType
if (dataType.isComplexType) {
@@ -276,4 +278,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
Seq.empty
}
+
+ override protected def opName: String = "UPDATE DATA"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index d118539..0f68004 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -32,8 +32,6 @@ import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.carbondata.api.CarbonStore.LOGGER
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
@@ -166,7 +164,6 @@ object DeleteExecution {
} else {
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
- Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
val errorMsg =
"Delete data operation is failed due to failure in creating delete delta file for " +
"segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
@@ -201,19 +198,14 @@ object DeleteExecution {
listOfSegmentToBeMarkedDeleted)
) {
LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
- Audit.log(LOGGER, s"Delete data operation is successful for ${ database }.${ tableName }")
- }
- else {
+ } else {
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
val errorMessage = "Delete data operation is failed due to failure " +
"in table status updation."
- Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
LOGGER.error("Delete data operation is failed due to failure in table status updation.")
executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
executorErrors.errorMsg = errorMessage
- // throw new Exception(errorMessage)
}
}
@@ -290,12 +282,10 @@ object DeleteExecution {
deleteStatus = SegmentStatus.SUCCESS
} catch {
case e : MultipleMatchingException =>
- Audit.log(LOGGER, e.getMessage)
LOGGER.error(e.getMessage)
// dont throw exception here.
case e: Exception =>
val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
- Audit.log(LOGGER, errorMsg)
LOGGER.error(errorMsg + e.getMessage)
throw e
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 3472d8a..6224d0d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -130,7 +129,6 @@ object HorizontalCompaction {
}
LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].")
- Audit.log(LOG, s"Horizontal Update Compaction operation started for [$db.$table].")
try {
// Update Compaction.
@@ -154,7 +152,6 @@ object HorizontalCompaction {
s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
}
LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
- Audit.log(LOG, s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
}
/**
@@ -180,7 +177,6 @@ object HorizontalCompaction {
}
LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].")
- Audit.log(LOG, s"Horizontal Delete Compaction operation started for [$db.$table].")
try {
@@ -225,7 +221,6 @@ object HorizontalCompaction {
timestamp.toString,
segmentUpdateStatusManager)
if (updateStatus == false) {
- Audit.log(LOG, s"Delete Compaction data operation is failed for [$db.$table].")
LOG.error("Delete Compaction data operation is failed.")
throw new HorizontalCompactionException(
s"Horizontal Delete Compaction Failed for [$db.$table] ." +
@@ -233,7 +228,6 @@ object HorizontalCompaction {
}
else {
LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].")
- Audit.log(LOG, s"Horizontal Delete Compaction operation completed for [$db.$table].")
}
}
catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
index 4224efa..f7f76b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.command
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.spark.sql._
@@ -24,6 +25,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.util.Auditor
import org.apache.carbondata.spark.exception.ProcessMetaDataException
object Checker {
@@ -61,20 +64,71 @@ trait DataProcessOperation {
}
/**
+ * An utility that run the command with audit log
+ */
+trait Auditable {
+ // operation id that will be written in audit log
+ private val operationId: String = String.valueOf(System.nanoTime())
+
+ // extra info to be written in audit log, set by subclass of AtomicRunnableCommand
+ var auditInfo: Map[String, String] = _
+
+ // holds the dbName and tableName for which this command is executed for
+ // used for audit log, set by subclass of AtomicRunnableCommand
+ private var table: String = _
+
+ // implement by subclass, return the operation name that record in audit log
+ protected def opName: String
+
+ protected def opTime(startTime: Long) = s"${System.currentTimeMillis() - startTime} ms"
+
+ protected def setAuditTable(dbName: String, tableName: String): Unit =
+ table = s"$dbName.$tableName"
+
+ protected def setAuditTable(carbonTable: CarbonTable): Unit =
+ table = s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}"
+
+ protected def setAuditInfo(map: Map[String, String]): Unit = auditInfo = map
+
+ /**
+ * Run the passed command and record the audit log.
+ * Two audit log will be output, one for operation start another for operation success/failure
+ * @param runCmd command to run
+ * @param spark session
+ * @return command result
+ */
+ protected def runWithAudit(runCmd: (SparkSession => Seq[Row]), spark: SparkSession): Seq[Row] = {
+ val start = System.currentTimeMillis()
+ Auditor.logOperationStart(opName, operationId)
+ val rows = try {
+ runCmd(spark)
+ } catch {
+ case e: Throwable =>
+ val map = Map("Exception" -> e.getClass.getName, "Message" -> e.getMessage)
+ Auditor.logOperationEnd(opName, operationId, false, table, opTime(start), map.asJava)
+ throw e
+ }
+ Auditor.logOperationEnd(opName, operationId, true, table, opTime(start),
+ if (auditInfo != null) auditInfo.asJava else new java.util.HashMap[String, String]())
+ rows
+ }
+}
+
+/**
* Command that modifies metadata(schema, table_status, etc) only without processing data
*/
-abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation {
+abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation with Auditable {
override def run(sparkSession: SparkSession): Seq[Row] = {
- processMetadata(sparkSession)
+ runWithAudit(processMetadata, sparkSession)
}
}
/**
* Command that process data only without modifying metadata
*/
-abstract class DataCommand extends RunnableCommand with DataProcessOperation {
+abstract class DataCommand extends RunnableCommand with DataProcessOperation with Auditable {
override def run(sparkSession: SparkSession): Seq[Row] = {
- processData(sparkSession)
+ runWithAudit(processData, sparkSession)
}
}
@@ -84,17 +138,19 @@ abstract class DataCommand extends RunnableCommand with DataProcessOperation {
* if process data failed.
*/
abstract class AtomicRunnableCommand
- extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation {
+ extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation with Auditable {
override def run(sparkSession: SparkSession): Seq[Row] = {
- processMetadata(sparkSession)
- try {
- processData(sparkSession)
- } catch {
- case e: Exception =>
- undoMetadata(sparkSession, e)
- throw e
- }
+ runWithAudit(spark => {
+ processMetadata(spark)
+ try {
+ processData(spark)
+ } catch {
+ case e: Exception =>
+ undoMetadata(spark, e)
+ throw e
+ }
+ }, sparkSession)
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 6c8b0b0..dcaac98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -55,6 +55,8 @@ case class CarbonAlterTableAddHivePartitionCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ setAuditTable(table)
+ setAuditInfo(Map("partition" -> partitionSpecsAndLocs.mkString(", ")))
if (table.isHivePartitionTable) {
if (table.isChildDataMap) {
throw new UnsupportedOperationException("Cannot add partition directly on aggregate tables")
@@ -156,4 +158,5 @@ case class CarbonAlterTableAddHivePartitionCommand(
Seq.empty[Row]
}
+ override protected def opName: String = "ADD HIVE PARTITION"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 1e987b0..a4629f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -68,6 +68,8 @@ case class CarbonAlterTableDropHivePartitionCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+ setAuditTable(table)
+ setAuditInfo(Map("partition" -> specs.mkString(",")))
if (table.isHivePartitionTable) {
var locks = List.empty[ICarbonLock]
try {
@@ -204,4 +206,5 @@ case class CarbonAlterTableDropHivePartitionCommand(
Seq.empty[Row]
}
+ override protected def opName: String = "DROP HIVE PARTITION"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index c230322..d4a2d7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
@@ -58,6 +57,8 @@ case class CarbonAlterTableDropPartitionCommand(
}
val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
val tableName = model.tableName
+ setAuditTable(dbName, tableName)
+ setAuditInfo(Map("partition" -> model.partitionId))
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
@@ -169,7 +170,6 @@ case class CarbonAlterTableDropPartitionCommand(
LOGGER.info("Locks released after alter table drop partition action.")
}
LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
- Audit.log(LOGGER, s"Alter table drop partition is successful for table $dbName.$tableName")
Seq.empty
}
@@ -178,8 +178,6 @@ case class CarbonAlterTableDropPartitionCommand(
carbonLoadModel: CarbonLoadModel,
dropWithData: Boolean,
oldPartitionIds: List[Int]): Unit = {
- Audit.log(LOGGER, s"Drop partition request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startDropThreads(
sqlContext,
@@ -237,6 +235,8 @@ case class CarbonAlterTableDropPartitionCommand(
}
}
}
+
+ override protected def opName: String = "DROP CUSTOM PARTITION"
}
case class dropPartitionThread(sqlContext: SQLContext,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 8b337c6..18c47a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -62,6 +61,8 @@ case class CarbonAlterTableSplitPartitionCommand(
val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val tableName = splitPartitionModel.tableName
+ setAuditTable(dbName, tableName)
+ setAuditInfo(Map("partition" -> splitPartitionModel.partitionId))
val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
val tablePath = relation.carbonTable.getTablePath
@@ -187,8 +188,6 @@ case class CarbonAlterTableSplitPartitionCommand(
LOGGER.info("Locks released after alter table add/split partition action.")
if (success) {
LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
- Audit.log(LOGGER,
- s"Alter table add/split partition is successful for table $dbName.$tableName")
}
}
Seq.empty
@@ -200,8 +199,6 @@ case class CarbonAlterTableSplitPartitionCommand(
carbonLoadModel: CarbonLoadModel,
oldPartitionIdList: List[Int]
): Unit = {
- Audit.log(LOGGER, s"Add partition request received for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startSplitThreads(sqlContext,
carbonLoadModel,
@@ -257,6 +254,8 @@ case class CarbonAlterTableSplitPartitionCommand(
}
}
}
+
+ override protected def opName: String = "SPLIT CUSTOM PARTITION"
}
case class SplitThread(sqlContext: SQLContext,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
index 9419b00..2915981 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
@@ -39,6 +39,7 @@ private[sql] case class CarbonShowCarbonPartitionsCommand(
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
val carbonTable = relation.carbonTable
+ setAuditTable(carbonTable)
val partitionInfo = carbonTable.getPartitionInfo(
carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
if (partitionInfo == null) {
@@ -51,4 +52,6 @@ private[sql] case class CarbonShowCarbonPartitionsCommand(
LOGGER.info("partition column name:" + columnName)
CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
}
+
+ override protected def opName: String = "SHOW CUSTOM PARTITION"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 1f1e7bd..719ed4a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -44,7 +43,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
val tableName = alterTableAddColumnsModel.tableName
val dbName = alterTableAddColumnsModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
- Audit.log(LOGGER, s"Alter table add columns request has been received for $dbName.$tableName")
+ setAuditTable(dbName, tableName)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
var timeStamp = 0L
@@ -82,6 +81,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
wrapperTableInfo,
carbonTable.getAbsoluteTableIdentifier,
sparkSession.sparkContext).process
+ setAuditInfo(Map(
+ "newColumn" -> newCols.map(x => s"${x.getColumnName}:${x.getDataType}").mkString(",")))
// generate dictionary files for the newly added columns
new AlterTableAddColumnRDD(sparkSession,
newCols,
@@ -105,10 +106,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
carbonTable, alterTableAddColumnsModel)
OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
- Audit.log(LOGGER, s"Alter table for add columns is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
- LOGGER.error("Alter table add columns failed", e)
if (newCols.nonEmpty) {
LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
new AlterTableDropColumnRDD(sparkSession,
@@ -124,4 +123,6 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
}
Seq.empty
}
+
+ override protected def opName: String = "ALTER TABLE ADD COLUMN"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 716b9c9..2bcd3aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -45,8 +44,10 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
val tableName = alterTableDataTypeChangeModel.tableName
val dbName = alterTableDataTypeChangeModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
- Audit.log(LOGGER,
- s"Alter table change data type request has been received for $dbName.$tableName")
+ setAuditTable(dbName, tableName)
+ setAuditInfo(Map(
+ "column" -> alterTableDataTypeChangeModel.columnName,
+ "newType" -> alterTableDataTypeChangeModel.dataTypeInfo.dataType))
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
// get the latest carbon table and check for column existence
@@ -70,16 +71,12 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
val columnName = alterTableDataTypeChangeModel.columnName
val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
- Audit.log(LOGGER, s"Alter table change data type request has failed. " +
- s"Column $columnName does not exist")
throwMetadataException(dbName, tableName, s"Column does not exist: $columnName")
}
val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
if (carbonColumn.size == 1) {
validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
} else {
- Audit.log(LOGGER, s"Alter table change data type request has failed. " +
- s"Column $columnName is invalid")
throwMetadataException(dbName, tableName, s"Invalid Column: $columnName")
}
// read the latest schema file
@@ -118,11 +115,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
alterTableDataTypeChangeModel)
OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
- Audit.log(LOGGER,
- s"Alter table for data type change is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
- LOGGER.error("Alter table change datatype failed : " + e.getMessage)
if (carbonTable != null) {
AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
}
@@ -181,4 +175,6 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
s"Only Int and Decimal data types are allowed for modification")
}
}
+
+ override protected def opName: String = "ALTER TABLE CHANGE DATA TYPE"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index d601ed6..ccf9e54 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -46,7 +45,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
val tableName = alterTableDropColumnModel.tableName
val dbName = alterTableDropColumnModel.databaseName
.getOrElse(sparkSession.catalog.currentDatabase)
- Audit.log(LOGGER, s"Alter table drop columns request has been received for $dbName.$tableName")
+ setAuditTable(dbName, tableName)
+ setAuditInfo(Map("column" -> alterTableDropColumnModel.columns.mkString(", ")))
var locks = List.empty[ICarbonLock]
var timeStamp = 0L
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
@@ -161,10 +161,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext)
LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
- Audit.log(LOGGER, s"Alter table for drop columns is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
- LOGGER.error("Alter table drop columns failed : " + e.getMessage)
if (carbonTable != null) {
AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
}
@@ -176,4 +174,6 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
}
Seq.empty
}
+
+ override protected def opName: String = "ALTER TABLE DROP COLUMN"
}