You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/11/09 00:50:12 UTC

[4/6] carbondata git commit: [CARBONDATA-3064] Support separate audit log

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index a1c68a3..c64f50b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -28,7 +28,6 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
@@ -48,6 +47,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
     val newTableIdentifier = alterTableRenameModel.newTableIdentifier
     val oldDatabaseName = oldTableIdentifier.database
       .getOrElse(sparkSession.catalog.currentDatabase)
+    setAuditTable(oldDatabaseName, oldTableIdentifier.table)
+    setAuditInfo(Map("newName" -> alterTableRenameModel.newTableIdentifier.table))
     val newDatabaseName = newTableIdentifier.database
       .getOrElse(sparkSession.catalog.currentDatabase)
     if (!oldDatabaseName.equalsIgnoreCase(newDatabaseName)) {
@@ -60,15 +61,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
     }
     val oldTableName = oldTableIdentifier.table.toLowerCase
     val newTableName = newTableIdentifier.table.toLowerCase
-    Audit.log(LOGGER, s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     LOGGER.info(s"Rename table request has been received for $oldDatabaseName.$oldTableName")
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation: CarbonRelation =
       metastore.lookupRelation(oldTableIdentifier.database, oldTableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
-      Audit.log(LOGGER, s"Rename table request has failed. " +
-                   s"Table $oldDatabaseName.$oldTableName does not exist")
       throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
     }
 
@@ -162,13 +160,11 @@ private[sql] case class CarbonAlterTableRenameCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
 
       sparkSession.catalog.refreshTable(newIdentifier.quotedString)
-      Audit.log(LOGGER, s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
       case e: ConcurrentOperationException =>
         throw e
       case e: Exception =>
-        LOGGER.error("Rename table failed: " + e.getMessage, e)
         if (carbonTable != null) {
           AlterTableUtil.revertRenameTableChanges(
             newTableName,
@@ -182,4 +178,5 @@ private[sql] case class CarbonAlterTableRenameCommand(
     Seq.empty
   }
 
+  override protected def opName: String = "ALTER TABLE RENAME TABLE"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
index 51c0e6e..b1e7e33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableSetCommand.scala
@@ -29,17 +29,18 @@ private[sql] case class CarbonAlterTableSetCommand(
     isView: Boolean)
   extends MetadataCommand {
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processMetadata(sparkSession)
-  }
-
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    setAuditTable(tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase),
+      tableIdentifier.table)
     AlterTableUtil.modifyTableProperties(
       tableIdentifier,
       properties,
       Nil,
       set = true)(sparkSession,
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+    setAuditInfo(properties)
     Seq.empty
   }
+
+  override protected def opName: String = "ALTER TABLE SET"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
index 2490f9e..361ba1d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
@@ -29,16 +29,17 @@ private[sql] case class CarbonAlterTableUnsetCommand(
     propKeys: Seq[String],
     ifExists: Boolean,
     isView: Boolean)
-  extends RunnableCommand with MetadataProcessOpeation {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processMetadata(sparkSession)
-  }
+  extends MetadataCommand {
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    setAuditTable(tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase),
+      tableIdentifier.table)
     AlterTableUtil.modifyTableProperties(tableIdentifier, Map.empty[String, String],
       propKeys, false)(sparkSession,
       sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog])
+    setAuditInfo(Map("unset" -> propKeys.mkString(", ")))
     Seq.empty
   }
+
+  override protected def opName: String = "ALTER TABLE UNSET"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
index 8d6a4cc..90da68a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonGetTableDetailCommand.scala
@@ -56,4 +56,6 @@ case class CarbonGetTableDetailCommand(
       AttributeReference("table size", LongType, nullable = false)(),
       AttributeReference("last modified time", LongType, nullable = false)())
   }
+
+  override protected def opName: String = "GET TABLE DETAIL"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
index 1f8bde2..a95d6a4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonCreateStreamCommand.scala
@@ -55,6 +55,8 @@ case class CarbonCreateStreamCommand(
       AttributeReference("Status", StringType, nullable = false)())
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
+    setAuditTable(CarbonEnv.getDatabaseName(sinkDbName)(sparkSession), sinkTableName)
+    setAuditInfo(Map("streamName" -> streamName, "query" -> query) ++ optionMap)
     val inputQuery = sparkSession.sql(query)
     val sourceTableSeq = inputQuery.logicalPlan collect {
       case r: LogicalRelation
@@ -286,4 +288,5 @@ case class CarbonCreateStreamCommand(
     Util.convertToSparkSchema(sourceTable, sortedCols)
   }
 
+  override protected def opName: String = "CREATE STREAM"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
index 82b84ef..49f1d11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonDropStreamCommand.scala
@@ -30,7 +30,10 @@ case class CarbonDropStreamCommand(
     ifExists: Boolean
 ) extends MetadataCommand {
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    setAuditInfo(Map("streamName" -> streamName))
     StreamJobManager.stopStream(streamName, ifExists)
     Seq.empty
   }
+
+  override protected def opName: String = "DROP STREAM"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
index 49c2ffb..ee749b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/stream/CarbonShowStreamsCommand.scala
@@ -49,6 +49,7 @@ case class CarbonShowStreamsCommand(
       case None => StreamJobManager.getAllJobs.toSeq
       case Some(table) =>
         val carbonTable = CarbonEnv.getCarbonTable(table.database, table.table)(sparkSession)
+        setAuditTable(carbonTable)
         StreamJobManager.getAllJobs.filter { job =>
           job.sinkTable.equalsIgnoreCase(carbonTable.getTableName) &&
           job.sinkDb.equalsIgnoreCase(carbonTable.getDatabaseName)
@@ -73,4 +74,6 @@ case class CarbonShowStreamsCommand(
       )
     }
   }
+
+  override protected def opName: String = "SHOW STREAMS"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
index 3252f1d..54be619 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableAsSelectCommand.scala
@@ -24,9 +24,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.management.CarbonInsertIntoCommand
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 
 /**
@@ -47,7 +44,6 @@ case class CarbonCreateTableAsSelectCommand(
   var loadCommand: CarbonInsertIntoCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = tableInfo.getFactTable.getTableName
     var isTableCreated = false
     var databaseOpt: Option[String] = None
@@ -55,14 +51,12 @@ case class CarbonCreateTableAsSelectCommand(
       databaseOpt = Some(tableInfo.getDatabaseName)
     }
     val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
-    Audit.log(LOGGER, s"Request received for CTAS for $dbName.$tableName")
+    setAuditTable(dbName, tableName)
+    setAuditInfo(Map("query" -> query.simpleString))
     // check if table already exists
     if (sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
       if (!ifNotExistsSet) {
-        Audit.log(LOGGER,
-          s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
-          s"Table [$tableName] already exists under database [$dbName]")
         throw new TableAlreadyExistsException(dbName, tableName)
       }
     } else {
@@ -78,7 +72,6 @@ case class CarbonCreateTableAsSelectCommand(
         databaseOpt = Some(tableInfo.getDatabaseName)
       }
       val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
-      val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
       val carbonDataSourceHadoopRelation = CarbonEnv.getInstance(sparkSession).carbonMetastore
         .createCarbonDataSourceHadoopRelation(sparkSession,
           TableIdentifier(tableName, Option(dbName)))
@@ -95,11 +88,7 @@ case class CarbonCreateTableAsSelectCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (null != loadCommand) {
-      val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
       loadCommand.processData(sparkSession)
-      val carbonTable = loadCommand.relation.carbonTable
-      Audit.log(LOGGER, s"CTAS operation completed successfully for " +
-                   s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
     }
     Seq.empty
   }
@@ -117,4 +106,6 @@ case class CarbonCreateTableAsSelectCommand(
       Option(dbName), tableName).run(sparkSession)
     Seq.empty
   }
+
+  override protected def opName: String = "CREATE TABLE AS SELECT"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 5d039bf..ca39931 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -19,15 +19,12 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.JavaConverters._
 
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.MetadataCommand
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -36,7 +33,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
@@ -59,17 +56,16 @@ case class CarbonCreateTableCommand(
       databaseOpt = Some(tableInfo.getDatabaseName)
     }
     val dbName = CarbonEnv.getDatabaseName(databaseOpt)(sparkSession)
+    setAuditTable(dbName, tableName)
+    setAuditInfo(tableInfo.getFactTable.getTableProperties.asScala.toMap
+                 ++ Map("external" -> isExternal.toString))
     // set dbName and tableUnique Name in the table info
     tableInfo.setDatabaseName(dbName)
     tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
-    Audit.log(LOGGER, s"Creating Table with Database name [$dbName] and Table name [$tableName]")
     val isTransactionalTable = tableInfo.isTransactionalTable
     if (sparkSession.sessionState.catalog.listTables(dbName)
       .exists(_.table.equalsIgnoreCase(tableName))) {
       if (!ifNotExistsSet) {
-        Audit.log(LOGGER,
-          s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
-          s"Table [$tableName] already exists under database [$dbName]")
         throw new TableAlreadyExistsException(dbName, tableName)
       }
     } else {
@@ -180,16 +176,15 @@ case class CarbonCreateTableCommand(
               case _: Exception => // No operation
             }
             val msg = s"Create table'$tableName' in database '$dbName' failed"
-            Audit.log(LOGGER, msg.concat(", ").concat(e.getMessage))
-            LOGGER.error(msg, e)
             throwMetadataException(dbName, tableName, msg.concat(", ").concat(e.getMessage))
         }
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
         CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
       OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
-      Audit.log(LOGGER, s"Table created with Database name [$dbName] and Table name [$tableName]")
     }
     Seq.empty
   }
+
+  override protected def opName: String = "CREATE TABLE"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index b513c1f..8edc854 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -45,6 +45,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tblIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+    setAuditTable(relation.databaseName, relation.tableName)
     val mapper = new ObjectMapper()
     val colProps = StringBuilder.newBuilder
     val dims = relation.metaData.dims.map(x => x.toLowerCase)
@@ -267,4 +268,6 @@ private[sql] case class CarbonDescribeFormattedCommand(
         Row(f"$name%-36s", f"$dataType%-80s", f"$comment%-72s")
     }
   }
+
+  override protected def opName: String = "DESC FORMATTED"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 37ab04f..34d9b75 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -54,6 +53,7 @@ case class CarbonDropTableCommand(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
+    setAuditTable(dbName, tableName)
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
@@ -71,7 +71,6 @@ case class CarbonDropTableCommand(
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
         throw new ConcurrentOperationException(carbonTable, "loading", "drop table")
       }
-      Audit.log(LOGGER, s"Deleting table [$tableName] under database [$dbName]")
       if (carbonTable.isStreamingSink) {
         // streaming table should acquire streaming.lock
         carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK)
@@ -142,8 +141,6 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
-      Audit.log(LOGGER, s"Deleted table [$tableName] under database [$dbName]")
-
     } catch {
       case ex: NoSuchTableException =>
         LOGGER.error(ex.getLocalizedMessage, ex)
@@ -200,4 +197,5 @@ case class CarbonDropTableCommand(
     Seq.empty
   }
 
+  override protected def opName: String = "DROP TABLE"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
index cb402c7..8939c6a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonExplainCommand.scala
@@ -32,7 +32,7 @@ case class CarbonExplainCommand(
 ) extends MetadataCommand {
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val explainCommand = child.asInstanceOf[ExplainCommand]
-
+    setAuditInfo(Map("query" -> explainCommand.logicalPlan.simpleString))
     val isCommand = explainCommand.logicalPlan match {
       case _: Command => true
       case Union(childern) if childern.forall(_.isInstanceOf[Command]) => true
@@ -61,5 +61,7 @@ case class CarbonExplainCommand(
       ExplainCollector.remove()
     }
   }
+
+  override protected def opName: String = "EXPLAIN"
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
index 534703d..d50b766 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonShowTablesCommand.scala
@@ -63,4 +63,5 @@ private[sql] case class CarbonShowTablesCommand ( databaseName: Option[String],
 
   }
 
+  override protected def opName: String = "SHOW TABLES"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 16d974e..b4dd1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -256,7 +256,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         AttributeSet(handledPredicates.flatMap(_.references)) --
         (projectSet ++ unhandledSet).map(relation.attributeMap)
       } catch {
-        case e => throw new CarbonPhysicalPlanException
+        case e: Throwable => throw new CarbonPhysicalPlanException
       }
     }
     // Combines all Catalyst filter `Expression`s that are either not convertible to data source

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index c681b62..e26163f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -63,11 +63,11 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
 }
 
 case class CarbonSetCommand(command: SetCommand)
-  extends RunnableCommand {
+  extends MetadataCommand {
 
   override val output: Seq[Attribute] = command.output
 
-  override def run(sparkSession: SparkSession): Seq[Row] = {
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val sessionParams = CarbonEnv.getInstance(sparkSession).carbonSessionInfo.getSessionParams
     command.kv match {
       case Some((key, Some(value))) =>
@@ -86,6 +86,9 @@ case class CarbonSetCommand(command: SetCommand)
     }
     command.run(sparkSession)
   }
+
+  override protected def opName: String = "SET"
+
 }
 
 object CarbonSetCommand {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 39e2f30..4ce4459 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -39,7 +39,6 @@ import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
@@ -548,11 +547,6 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
               val colName = name.substring(14)
               if (name.startsWith("default.value.") &&
                   fields.count(p => p.column.equalsIgnoreCase(colName)) == 1) {
-                LOGGER.error(s"Duplicate default value exist for new column: ${ colName }")
-                Audit.log(LOGGER,
-                  s"Validation failed for Create/Alter Table Operation " +
-                  s"for ${ table }. " +
-                  s"Duplicate default value exist for new column: ${ colName }")
                 sys.error(s"Duplicate default value exist for new column: ${ colName }")
               }
             }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 27443a8..3faa111 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -29,10 +29,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder
@@ -43,9 +41,8 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.format.{Encoding, SchemaEvolutionEntry, TableInfo}
+import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
 
 
@@ -70,8 +67,6 @@ object AlterTableUtil {
         .lookupRelation(Option(dbName), tableName)(sparkSession)
         .asInstanceOf[CarbonRelation]
     if (relation == null) {
-      Audit.log(LOGGER, s"Alter table request has failed. " +
-                   s"Table $dbName.$tableName does not exist")
       sys.error(s"Table $dbName.$tableName does not exist")
     }
     // acquire the lock first
@@ -294,7 +289,6 @@ object AlterTableUtil {
     (sparkSession: SparkSession, catalog: CarbonSessionCatalog): Unit = {
     val tableName = tableIdentifier.table
     val dbName = tableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    Audit.log(LOGGER, s"Alter table newProperties request has been received for $dbName.$tableName")
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     try {
@@ -383,10 +377,8 @@ object AlterTableUtil {
         propKeys,
         set)
       LOGGER.info(s"Alter table newProperties is successful for table $dbName.$tableName")
-      Audit.log(LOGGER, s"Alter table newProperties is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table newProperties failed", e)
         sys.error(s"Alter table newProperties operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion