You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/11/09 00:50:12 UTC
[4/6] carbondata git commit: [CARBONDATA-3064] Support separate audit
log
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index a1c68a3..c64f50b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -28,7 +28,6 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
@@ -48,6 +47,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
val newTableIdentifier = alterTableRenameModel.newTableIdentifier
val oldDatabaseName = oldTableIdentifier.database
.getOrElse(sparkSession.catalog.currentDatabase)
+ setAuditTable(oldDatabaseName, oldTableIdentifier.table)
+ setAuditInfo(Map("newName" -> alterTableRenameModel.newTableIdentifier.table))
val newDatabaseName = newTableIdentifier.database
.getOrElse(sparkSession.catalog.currentDatabase)
if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
@@ -60,15 +61,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
}
val oldTableName = oldTableIdentifier.table.toLowerCase
val newTableName = newTableIdentifier.table.toLowerCase
- Audit.log(LOGGER, s"Rename table request has been received for $oldDatabaseName.$oldTableName")
LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
val relation: CarbonRelation =
metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
- Audit.log(LOGGER, s"Rename table request has failed. " +
- s"Table $oldDatabaseName.$oldTableName does not exist")
throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
}
@@ -162,13 +160,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
sparkSession.catalog.refreshTable(newIdentifier.quotedString)
- Audit.log(LOGGER, s"Table $oldTableName has been successfully renamed to $newTableName")
LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
} catch {
case e: ConcurrentOperationException =>
throw e
case e: Exception =>
- LOGGER.error("Rename table failed: " + e.getMessage, e)
if (carbonTable != null) {
AlterTableUtil.revertRenameTableChanges(
newTableName,
@@ -182,4 +178,5 @@ private[sql] case class CarbonAlterTableRenameCommand(
Seq.empty
}
+ override protected def opName: String = "ALTER TABLE RENAME TABLE"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
index 51c0e6e..b1e7e33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
@@ -29,17 +29,18 @@ private[sql] case class CarbonAlterTableSetCommand(
isView: Boolean)
extends MetadataCommand {
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processMetadata(sparkSession)
- }
-
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ setAuditTable(tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase),
+ tableIdentifier.table)
AlterTableUtil.modifyTableProperties(
tableIdentifier,
properties,
Nil,
set = true)(sparkSession,
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ setAuditInfo(properties)
Seq.empty
}
+
+ override protected def opName: String = "ALTER TABLE SET"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
index 2490f9e..361ba1d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
@@ -29,16 +29,17 @@ private[sql] case class CarbonAlterTableUnsetCommand(
propKeys: Seq[String],
ifExists: Boolean,
isView: Boolean)
- extends RunnableCommand with MetadataProcessOpeation {
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- processMetadata(sparkSession)
- }
+ extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ setAuditTable(tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase),
+ tableIdentifier.table)
AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String],
propKeys, false)(sparkSession,
sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+ setAuditInfo(Map("unset" -> propKeys.mkString(", ")))
Seq.empty
}
+
+ override protected def opName: String = "ALTER TABLE UNSET"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
index 8d6a4cc..90da68a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
@@ -56,4 +56,6 @@ case class CarbonGetTableDetailCommand(
AttributeReference("table size", LongType, nullable = false)(),
AttributeReference("last modified time", LongType, nullable = false)())
}
+
+ override protected def opName: String = "GET TABLE DETAIL"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index 1f8bde2..a95d6a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -55,6 +55,8 @@ case class CarbonCreateStreamCommand(
AttributeReference("Status", StringType, nullable = false)())
override def processData(sparkSession: SparkSession): Seq[Row] = {
+ setAuditTable(CarbonEnv.getDatabaseName(sinkDbName)(sparkSession), sinkTableName)
+ setAuditInfo(Map("streamName" -> streamName, "query" -> query) ++ optionMap)
val inputQuery = sparkSession.sql(query)
val sourceTableSeq = inputQuery.logicalPlan collect {
case r: LogicalRelation
@@ -286,4 +288,5 @@ case class CarbonCreateStreamCommand(
Util.convertToSparkSchema(sourceTable, sortedCols)
}
+ override protected def opName: String = "CREATE STREAM"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
index 82b84ef..49f1d11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
@@ -30,7 +30,10 @@ case class CarbonDropStreamCommand(
ifExists: Boolean
) extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+ setAuditInfo(Map("streamName" -> streamName))
StreamJobManager.stopStream(streamName, ifExists)
Seq.empty
}
+
+ override protected def opName: String = "DROP STREAM"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
index 49c2ffb..ee749b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
@@ -49,6 +49,7 @@ case class CarbonShowStreamsCommand(
case None => StreamJobManager.getAllJobs.toSeq
case Some(table) =>
val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+ setAuditTable(carbonTable)
StreamJobManager.getAllJobs.filter { job =>
job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
@@ -73,4 +74,6 @@ case class CarbonShowStreamsCommand(
)
}
}
+
+ override protected def opName: String = "SHOW STREAMS"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index 3252f1d..54be619 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -24,9 +24,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
-import org.apache.carbondata.api.CarbonStore.LOGGER
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.metadata.schema.table.TableInfo
/**
@@ -47,7 +44,6 @@ case class CarbonCreateTableAsSelectCommand(
var loadCommand: CarbonInsertIntoCommand = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = tableInfo.getFactTable.getTableName
var isTableCreated = false
var databaseOpt: Option[String] = None
@@ -55,14 +51,12 @@ case class CarbonCreateTableAsSelectCommand(
databaseOpt = Some(tableInfo.getDatabaseName)
}
val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
- Audit.log(LOGGER, s"Request received for CTAS for $dbName.$tableName")
+ setAuditTable(dbName, tableName)
+ setAuditInfo(Map("query" -> query.simpleString))
// check if table already exists
if (sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
if (!ifNotExistsSet) {
- Audit.log(LOGGER,
- s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
- s"Table [$tableName] already exists under database [$dbName]")
throw new TableAlreadyExistsException(dbName, tableName)
}
} else {
@@ -78,7 +72,6 @@ case class CarbonCreateTableAsSelectCommand(
databaseOpt = Some(tableInfo.getDatabaseName)
}
val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val carbonDataSourceHadoopRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.createCarbonDataSourceHadoopRelation(sparkSession,
TableIdentifier(tableName, Option(dbName)))
@@ -95,11 +88,7 @@ case class CarbonCreateTableAsSelectCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (null != loadCommand) {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
loadCommand.processData(sparkSession)
- val carbonTable = loadCommand.relation.carbonTable
- Audit.log(LOGGER, s"CTAS operation completed successfully for " +
- s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
}
Seq.empty
}
@@ -117,4 +106,6 @@ case class CarbonCreateTableAsSelectCommand(
Option(dbName), tableName).run(sparkSession)
Seq.empty
}
+
+ override protected def opName: String = "CREATE TABLE AS SELECT"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 5d039bf..ca39931 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -19,15 +19,12 @@ package org.apache.spark.sql.execution.command.table
import scala.collection.JavaConverters._
-import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
import org.apache.spark.sql.execution.command.MetadataCommand
-import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -36,7 +33,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.spark.util.CarbonSparkUtil
@@ -59,17 +56,16 @@ case class CarbonCreateTableCommand(
databaseOpt = Some(tableInfo.getDatabaseName)
}
val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+ setAuditTable(dbName, tableName)
+ setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap
+ ++ Map("external" -> isExternal.toString))
// set dbName and tableUnique Name in the table info
tableInfo.setDatabaseName(dbName)
tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
- Audit.log(LOGGER, s"Creating Table with Database name [$dbName] and Table name [$tableName]")
val isTransactionalTable = tableInfo.isTransactionalTable
if (sparkSession.sessionState.catalog.listTables(dbName)
.exists(_.table.equalsIgnoreCase(tableName))) {
if (!ifNotExistsSet) {
- Audit.log(LOGGER,
- s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
- s"Table [$tableName] already exists under database [$dbName]")
throw new TableAlreadyExistsException(dbName, tableName)
}
} else {
@@ -180,16 +176,15 @@ case class CarbonCreateTableCommand(
case _: Exception => // No operation
}
val msg = s"Create table'$tableName' in database '$dbName' failed"
- Audit.log(LOGGER, msg.concat(", ").concat(e.getMessage))
- LOGGER.error(msg, e)
throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
}
}
val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
- Audit.log(LOGGER, s"Table created with Database name [$dbName] and Table name [$tableName]")
}
Seq.empty
}
+
+ override protected def opName: String = "CREATE TABLE"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index b513c1f..8edc854 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -45,6 +45,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
.lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ setAuditTable(relation.databaseName, relation.tableName)
val mapper = new ObjectMapper()
val colProps = StringBuilder.newBuilder
val dims = relation.metaData.dims.map(x => x.toLowerCase)
@@ -267,4 +268,6 @@ private[sql] case class CarbonDescribeFormattedCommand(
Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s")
}
}
+
+ override protected def opName: String = "DESC FORMATTED"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 37ab04f..34d9b75 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -54,6 +53,7 @@ case class CarbonDropTableCommand(
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+ setAuditTable(dbName, tableName)
val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
@@ -71,7 +71,6 @@ case class CarbonDropTableCommand(
if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
throw new ConcurrentOperationException(carbonTable, "loading", "drop table")
}
- Audit.log(LOGGER, s"Deleting table [$tableName] under database [$dbName]")
if (carbonTable.isStreamingSink) {
// streaming table should acquire streaming.lock
carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK)
@@ -142,8 +141,6 @@ case class CarbonDropTableCommand(
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
- Audit.log(LOGGER, s"Deleted table [$tableName] under database [$dbName]")
-
} catch {
case ex: NoSuchTableException =>
LOGGER.error(ex.getLocalizedMessage, ex)
@@ -200,4 +197,5 @@ case class CarbonDropTableCommand(
Seq.empty
}
+ override protected def opName: String = "DROP TABLE"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
index cb402c7..8939c6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
@@ -32,7 +32,7 @@ case class CarbonExplainCommand(
) extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val explainCommand = child.asInstanceOf[ExplainCommand]
-
+ setAuditInfo(Map("query" -> explainCommand.logicalPlan.simpleString))
val isCommand = explainCommand.logicalPlan match {
case _: Command => true
case Union(childern) if childern.forall(_.isInstanceOf[Command]) => true
@@ -61,5 +61,7 @@ case class CarbonExplainCommand(
ExplainCollector.remove()
}
}
+
+ override protected def opName: String = "EXPLAIN"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
index 534703d..d50b766 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
@@ -63,4 +63,5 @@ private[sql] case class CarbonShowTablesCommand ( databaseName: Option[String],
}
+ override protected def opName: String = "SHOW TABLES"
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 16d974e..b4dd1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -256,7 +256,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
} catch {
- case e => throw new CarbonPhysicalPlanException
+ case e: Throwable => throw new CarbonPhysicalPlanException
}
}
// Combines all Catalyst filter `Expression`s that are either not convertible to data source
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index c681b62..e26163f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -63,11 +63,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
}
case class CarbonSetCommand(command: SetCommand)
- extends RunnableCommand {
+ extends MetadataCommand {
override val output: Seq[Attribute] = command.output
- override def run(sparkSession: SparkSession): Seq[Row] = {
+ override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
command.kv match {
case Some((key, Some(value))) =>
@@ -86,6 +86,9 @@ case class CarbonSetCommand(command: SetCommand)
}
command.run(sparkSession)
}
+
+ override protected def opName: String = "SET"
+
}
object CarbonSetCommand {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 39e2f30..4ce4459 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -39,7 +39,6 @@ import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -548,11 +547,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
val colName = name.substring(14)
if (name.startsWith("default.value.") &&
fields.count(p => p.column.equalsIgnoreCase(colName)) == 1) {
- LOGGER.error(s"Duplicate default value exist for new column: ${ colName }")
- Audit.log(LOGGER,
- s"Validation failed for Create/Alter Table Operation " +
- s"for ${ table }. " +
- s"Duplicate default value exist for new column: ${ colName }")
sys.error(s"Duplicate default value exist for new column: ${ colName }")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 27443a8..3faa111 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -29,10 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
-import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
@@ -43,9 +41,8 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.format.{Encoding, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -70,8 +67,6 @@ object AlterTableUtil {
.lookupRelation(Option(dbName), tableName)(sparkSession)
.asInstanceOf[CarbonRelation]
if (relation == null) {
- Audit.log(LOGGER, s"Alter table request has failed. " +
- s"Table $dbName.$tableName does not exist")
sys.error(s"Table $dbName.$tableName does not exist")
}
// acquire the lock first
@@ -294,7 +289,6 @@ object AlterTableUtil {
(sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- Audit.log(LOGGER, s"Alter table newProperties request has been received for $dbName.$tableName")
val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
var locks = List.empty[ICarbonLock]
try {
@@ -383,10 +377,8 @@ object AlterTableUtil {
propKeys,
set)
LOGGER.info(s"Alter table newProperties is successful for table $dbName.$tableName")
- Audit.log(LOGGER, s"Alter table newProperties is successful for table $dbName.$tableName")
} catch {
case e: Exception =>
- LOGGER.error("Alter table newProperties failed", e)
sys.error(s"Alter table newProperties operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion