You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/11/09 00:50:13 UTC

[5/6] carbondata git commit: [CARBONDATA-3064] Support separate audit log

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index c8c9a47..35b73d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -39,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableMergeIndexEvent, Event, OperationContext, OperationEventListener}
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostExecutionEvent
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-import org.apache.carbondata.spark.util.CommonUtil
 
 class MergeIndexEventListener extends OperationEventListener with Logging {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -47,7 +45,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     event match {
       case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
-        Audit.log(LOGGER, "Load post status event-listener called for merge index")
+        LOGGER.info("Load post status event-listener called for merge index")
         val loadModel = preStatusUpdateEvent.getCarbonLoadModel
         val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
         val compactedSegments = loadModel.getMergedSegmentIds
@@ -73,7 +71,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
           }
         }
       case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
-        Audit.log(LOGGER, "Merge index for compaction called")
+        LOGGER.info("Merge index for compaction called")
         val carbonTable = alterTableCompactionPostEvent.carbonTable
         val mergedLoads = alterTableCompactionPostEvent.compactedLoads
         val sparkSession = alterTableCompactionPostEvent.sparkSession
@@ -84,8 +82,6 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
         if (!carbonMainTable.isStreamingSink) {
-          Audit.log(LOGGER, s"Compaction request received for table " +
-                       s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
           LOGGER.info(s"Merge Index request received for table " +
                       s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
           val lock = CarbonLockFactory.getCarbonLockObj(
@@ -130,16 +126,11 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               clearBlockDataMapCache(carbonMainTable, validSegmentIds)
               val requestMessage = "Compaction request completed for table " +
                 s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
-              Audit.log(LOGGER, requestMessage)
               LOGGER.info(requestMessage)
             } else {
               val lockMessage = "Not able to acquire the compaction lock for table " +
-                                s"${ carbonMainTable.getDatabaseName }.${
-                                  carbonMainTable
-                                    .getTableName
-                                }"
-
-              Audit.log(LOGGER, lockMessage)
+                                s"${ carbonMainTable.getDatabaseName }." +
+                                s"${ carbonMainTable.getTableName}"
               LOGGER.error(lockMessage)
               CarbonException.analysisException(
                 "Table is already locked for compaction. Please try after some time.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 081482c..271a19b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,10 +22,8 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
@@ -63,6 +61,11 @@ case class CarbonCreateDataMapCommand(
       case _ => null
     }
 
+    if (mainTable != null) {
+      setAuditTable(mainTable)
+    }
+    setAuditInfo(Map("provider" -> dmProviderName, "dmName" -> dataMapName) ++ dmProperties)
+
     if (mainTable != null && !mainTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
@@ -75,7 +78,7 @@ case class CarbonCreateDataMapCommand(
       }
     }
 
-    if (mainTable !=null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) {
+    if (mainTable != null && CarbonUtil.getFormatVersion(mainTable) != ColumnarFormatVersion.V3) {
       throw new MalformedCarbonCommandException(s"Unsupported operation on table with " +
                                                 s"V1 or V2 format data")
     }
@@ -153,7 +156,6 @@ case class CarbonCreateDataMapCommand(
         systemFolderLocation, tableIdentifier, dmProviderName)
     OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
       operationContext)
-    Audit.log(LOGGER, s"DataMap $dataMapName successfully added")
     Seq.empty
   }
 
@@ -192,5 +194,7 @@ case class CarbonCreateDataMapCommand(
     }
     Seq.empty
   }
+
+  override protected def opName: String = "CREATE DATAMAP"
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
index be62ce4..267fedd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapRebuildCommand.scala
@@ -65,6 +65,8 @@ case class CarbonDataMapRebuildCommand(
         )(sparkSession)
     }
 
+    setAuditTable(table)
+
     val provider = DataMapManager.get().getDataMapProvider(table, schema, sparkSession)
     provider.rebuild()
 
@@ -87,4 +89,5 @@ case class CarbonDataMapRebuildCommand(
     Seq.empty
   }
 
+  override protected def opName: String = "REBUILD DATAMAP"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index ae33aa8..3cee810 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -58,6 +58,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
     tableIdentifier match {
       case Some(table) =>
         val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession)
+        setAuditTable(carbonTable)
         Checker.validateTableExists(table.database, table.table, sparkSession)
         if (carbonTable.hasDataMapSchema) {
           dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList)
@@ -97,4 +98,6 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
       Seq.empty
     }
   }
+
+  override protected def opName: String = "SHOW DATAMAP"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 67e2dee..38ec07d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -59,6 +58,7 @@ case class CarbonDropDataMapCommand(
   var dataMapSchema: DataMapSchema = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    setAuditInfo(Map("dmName" -> dataMapName))
     if (table.isDefined) {
       val databaseNameOp = table.get.database
       val tableName = table.get.table
@@ -78,6 +78,7 @@ case class CarbonDropDataMapCommand(
             null
         }
       }
+      setAuditTable(mainTable)
       val tableIdentifier =
         AbsoluteTableIdentifier
           .from(tablePath,
@@ -112,8 +113,6 @@ case class CarbonDropDataMapCommand(
         locksToBeAcquired foreach {
           lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
         }
-        Audit.log(LOGGER, s"Deleting datamap [$dataMapName] under table [$tableName]")
-
         // drop index,mv datamap on the main table.
         if (mainTable != null &&
             DataMapStoreManager.getInstance().getAllDataMap(mainTable).size() > 0) {
@@ -242,4 +241,5 @@ case class CarbonDropDataMapCommand(
     Seq.empty
   }
 
+  override protected def opName: String = "DROP DATAMAP"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 8e338db..1b1d708 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -33,7 +33,6 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -67,6 +66,10 @@ case class CarbonAlterTableCompactionCommand(
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = alterTableModel.tableName.toLowerCase
     val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
+    setAuditTable(dbName, tableName)
+    if (alterTableModel.customSegmentIds.nonEmpty) {
+      setAuditInfo(Map("segmentIds" -> alterTableModel.customSegmentIds.get.mkString(", ")))
+    }
     table = if (tableInfoOp.isDefined) {
       CarbonTable.buildFromTableInfo(tableInfoOp.get)
     } else {
@@ -217,8 +220,6 @@ case class CarbonAlterTableCompactionCommand(
       }
     }
 
-    Audit.log(LOGGER, s"Compaction request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
@@ -314,8 +315,6 @@ case class CarbonAlterTableCompactionCommand(
             throw e
         }
       } else {
-        Audit.log(LOGGER, "Not able to acquire the compaction lock for table " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
                      s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         CarbonException.analysisException(
@@ -379,4 +378,8 @@ case class CarbonAlterTableCompactionCommand(
       }
     }
   }
+
+  override protected def opName: String = {
+    s"ALTER TABLE COMPACTION ${alterTableModel.compactionType.toUpperCase}"
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
index ba20773..8df0217 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -33,6 +33,7 @@ case class CarbonAlterTableFinishStreaming(
   extends MetadataCommand {
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
+    setAuditTable(carbonTable)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val streamingLock = CarbonLockFactory.getCarbonLockObj(
       carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
@@ -58,4 +59,6 @@ case class CarbonAlterTableFinishStreaming(
     }
     Seq.empty
   }
+
+  override protected def opName: String = "ALTER TABLE FINISH STREAMING"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index a390191..4b5b4b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -57,7 +57,10 @@ case class CarbonCleanFilesCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
-
+    setAuditTable(carbonTable)
+    setAuditInfo(Map(
+      "force" -> forceTableClean.toString,
+      "internal" -> isInternalCleanCall.toString))
     if (carbonTable.hasAggregationDataMap) {
       cleanFileCommands = carbonTable.getTableInfo.getDataMapSchemaList.asScala.map {
         dataMapSchema =>
@@ -150,4 +153,6 @@ case class CarbonCleanFilesCommand(
           .error("Failed to clean in progress segments", e)
     }
   }
+
+  override protected def opName: String = "CLEAN FILES"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
index bf5adc3..d1a54d0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCliCommand.scala
@@ -47,6 +47,8 @@ case class CarbonCliCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    setAuditTable(carbonTable)
+    setAuditInfo(Map("options" -> commandOptions))
     val commandArgs: Seq[String] = commandOptions.split("\\s+")
     val finalCommands = commandArgs.collect {
       case a if a.trim.equalsIgnoreCase("summary") || a.trim.equalsIgnoreCase("benchmark") =>
@@ -59,4 +61,6 @@ case class CarbonCliCommand(
       Row(x)
     )
   }
+
+  override protected def opName: String = "CLI"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 165a032..275a0fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -35,7 +35,8 @@ case class CarbonDeleteLoadByIdCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
+    setAuditTable(carbonTable)
+    setAuditInfo(Map("segmentIds" -> loadIds.mkString(", ")))
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
@@ -66,4 +67,6 @@ case class CarbonDeleteLoadByIdCommand(
     OperationListenerBus.getInstance.fireEvent(deleteSegmentPostEvent, operationContext)
     Seq.empty
   }
+
+  override protected def opName: String = "DELETE SEGMENT BY ID"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 19f5100..db1b7b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -36,7 +36,8 @@ case class CarbonDeleteLoadByLoadDateCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
+    setAuditTable(carbonTable)
+    setAuditInfo(Map("date" -> dateField))
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
@@ -66,4 +67,6 @@ case class CarbonDeleteLoadByLoadDateCommand(
 
     Seq.empty
   }
+
+  override protected def opName: String = "DELETE SEGMENT BY DATE"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index ee0f5ab..2a64b19 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -36,6 +36,7 @@ case class CarbonInsertIntoCommand(
   var loadCommand: CarbonLoadDataCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    setAuditTable(relation.carbonTable.getDatabaseName, relation.carbonTable.getTableName)
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     def containsLimit(plan: LogicalPlan): Boolean = {
       plan find {
@@ -82,9 +83,19 @@ case class CarbonInsertIntoCommand(
   }
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (null != loadCommand) {
-      loadCommand.processData(sparkSession)
+      val rows = loadCommand.processData(sparkSession)
+      setAuditInfo(loadCommand.auditInfo)
+      rows
     } else {
       Seq.empty
     }
   }
+
+  override protected def opName: String = {
+    if (overwrite) {
+      "INSERT OVERWRITE"
+    } else {
+      "INSERT INTO"
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 5fbe82e..bba8af7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -48,9 +48,8 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -65,12 +64,11 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{BuildDataMapPostExecutionEvent, BuildDataMapPreExecutionEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
-import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -113,6 +111,7 @@ case class CarbonLoadDataCommand(
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    setAuditTable(dbName, tableName)
     table = if (tableInfoOp.isDefined) {
         CarbonTable.buildFromTableInfo(tableInfoOp.get)
       } else {
@@ -123,7 +122,6 @@ case class CarbonLoadDataCommand(
         }
         if (null == relation.carbonTable) {
           LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
-          Audit.log(LOGGER, s"Data loading failed. table not found: $dbName.$tableName")
           throw new NoSuchTableException(dbName, tableName)
         }
         relation.carbonTable
@@ -189,13 +187,12 @@ case class CarbonLoadDataCommand(
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     val carbonLoadModel = new CarbonLoadModel()
-    try {
-      val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-      val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
-      optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
-        carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
-          carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
-            CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
+    val tableProperties = table.getTableInfo.getFactTable.getTableProperties
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
+    optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
+      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
+        carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
+          CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
       optionsFinal
         .put("bad_record_path", CarbonBadRecordUtil.getBadRecordsPath(options.asJava, table))
@@ -216,176 +213,158 @@ case class CarbonLoadDataCommand(
           CompressorFactory.getInstance().getCompressor.getName)
       carbonLoadModel.setColumnCompressor(columnCompressor)
 
-      val javaPartition = mutable.Map[String, String]()
-      partition.foreach { case (k, v) =>
-        if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+    val javaPartition = mutable.Map[String, String]()
+    partition.foreach { case (k, v) =>
+      if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get
+    }
+
+    new CarbonLoadModelBuilder(table).build(
+      options.asJava,
+      optionsFinal,
+      carbonLoadModel,
+      hadoopConf,
+      javaPartition.asJava,
+      dataFrame.isDefined)
+    // Delete stale segment folders that are not in table status but are physically present in
+    // the Fact folder
+    LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
+    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    var isUpdateTableStatusRequired = false
+    // if the table is child then extract the uuid from the operation context and the parent would
+    // already generated UUID.
+    // if parent table then generate a new UUID else use empty.
+    val uuid = if (table.isChildDataMap) {
+      Option(operationContext.getProperty("uuid")).getOrElse("").toString
+    } else if (table.hasAggregationDataMap) {
+      UUID.randomUUID().toString
+    } else {
+      ""
+    }
+    try {
+      operationContext.setProperty("uuid", uuid)
+      val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
+        new LoadTablePreExecutionEvent(
+          table.getCarbonTableIdentifier,
+          carbonLoadModel)
+      operationContext.setProperty("isOverwrite", isOverwriteTable)
+      OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
+      // Add pre event listener for index datamap
+      val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
+      val dataMapOperationContext = new OperationContext()
+      if (tableDataMaps.size() > 0) {
+        val dataMapNames: mutable.Buffer[String] =
+          tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
+        val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
+          new BuildDataMapPreExecutionEvent(sparkSession,
+            table.getAbsoluteTableIdentifier, dataMapNames)
+        OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
+          dataMapOperationContext)
+      }
+      // First system has to partition the data first and then call the load data
+      LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
+      concurrentLoadLock = acquireConcurrentLoadLock()
+      // Clean up the old invalid segment data before creating a new entry for new load.
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
+      // add the start entry for the new load in the table status file
+      if (updateModel.isEmpty && !table.isHivePartitionTable) {
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
+          carbonLoadModel,
+          isOverwriteTable)
+        isUpdateTableStatusRequired = true
+      }
+      if (isOverwriteTable) {
+        LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
+      }
+      // if table is an aggregate table then disable single pass.
+      if (carbonLoadModel.isAggLoadRequest) {
+        carbonLoadModel.setUseOnePass(false)
       }
 
-      new CarbonLoadModelBuilder(table).build(
-        options.asJava,
-        optionsFinal,
-        carbonLoadModel,
-        hadoopConf,
-        javaPartition.asJava,
-        dataFrame.isDefined)
-      // Delete stale segment folders that are not in table status but are physically present in
-      // the Fact folder
-      LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-      TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
-      var isUpdateTableStatusRequired = false
-      // if the table is child then extract the uuid from the operation context and the parent would
-      // already generated UUID.
-      // if parent table then generate a new UUID else use empty.
-      val uuid = if (table.isChildDataMap) {
-        Option(operationContext.getProperty("uuid")).getOrElse("").toString
-      } else if (table.hasAggregationDataMap) {
-        UUID.randomUUID().toString
-      } else {
-        ""
+      // start dictionary server when use one pass load and dimension with DICTIONARY
+      // encoding is present.
+      val allDimensions =
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+      val createDictionary = allDimensions.exists {
+        carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                           !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
       }
-      try {
-        operationContext.setProperty("uuid", uuid)
-        val loadTablePreExecutionEvent: LoadTablePreExecutionEvent =
-          new LoadTablePreExecutionEvent(
-            table.getCarbonTableIdentifier,
-            carbonLoadModel)
-        operationContext.setProperty("isOverwrite", isOverwriteTable)
-        OperationListenerBus.getInstance.fireEvent(loadTablePreExecutionEvent, operationContext)
-        // Add pre event listener for index datamap
-        val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(table)
-        val dataMapOperationContext = new OperationContext()
-        if (tableDataMaps.size() > 0) {
-          val dataMapNames: mutable.Buffer[String] =
-            tableDataMaps.asScala.map(dataMap => dataMap.getDataMapSchema.getDataMapName)
-          val buildDataMapPreExecutionEvent: BuildDataMapPreExecutionEvent =
-            new BuildDataMapPreExecutionEvent(sparkSession,
-              table.getAbsoluteTableIdentifier, dataMapNames)
-          OperationListenerBus.getInstance().fireEvent(buildDataMapPreExecutionEvent,
-            dataMapOperationContext)
-        }
-        // First system has to partition the data first and then call the load data
-        LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-        concurrentLoadLock = acquireConcurrentLoadLock()
-        // Clean up the old invalid segment data before creating a new entry for new load.
-        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
-        // add the start entry for the new load in the table status file
-        if (updateModel.isEmpty && !table.isHivePartitionTable) {
-          CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
-            carbonLoadModel,
-            isOverwriteTable)
-          isUpdateTableStatusRequired = true
-        }
-        if (isOverwriteTable) {
-          LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
-        }
-        // if table is an aggregate table then disable single pass.
-        if (carbonLoadModel.isAggLoadRequest) {
-          carbonLoadModel.setUseOnePass(false)
+      if (!createDictionary) {
+        carbonLoadModel.setUseOnePass(false)
+      }
+      // Create table and metadata folders if not exist
+      if (carbonLoadModel.isCarbonTransactionalTable) {
+        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+        val fileType = FileFactory.getFileType(metadataDirectoryPath)
+        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+          FileFactory.mkdirs(metadataDirectoryPath, fileType)
         }
+      } else {
+        carbonLoadModel.setSegmentId(System.currentTimeMillis().toString)
+      }
+      val partitionStatus = SegmentStatus.SUCCESS
+      val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
+      if (carbonLoadModel.getUseOnePass) {
+        loadDataUsingOnePass(
+          sparkSession,
+          carbonProperty,
+          carbonLoadModel,
+          columnar,
+          partitionStatus,
+          hadoopConf,
+          operationContext,
+          LOGGER)
+      } else {
+        loadData(
+          sparkSession,
+          carbonLoadModel,
+          columnar,
+          partitionStatus,
+          hadoopConf,
+          operationContext,
+          LOGGER)
+      }
+      val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
+        new LoadTablePostExecutionEvent(
+          table.getCarbonTableIdentifier,
+          carbonLoadModel)
+      OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
+      if (tableDataMaps.size() > 0) {
+        val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
+          table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
+        OperationListenerBus.getInstance()
+          .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+      }
 
-        // start dictionary server when use one pass load and dimension with DICTIONARY
-        // encoding is present.
-        val allDimensions =
-        carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
-        val createDictionary = allDimensions.exists {
-          carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                             !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-        }
-        if (!createDictionary) {
-          carbonLoadModel.setUseOnePass(false)
-        }
-        // Create table and metadata folders if not exist
-        if (carbonLoadModel.isCarbonTransactionalTable) {
-          val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
-          val fileType = FileFactory.getFileType(metadataDirectoryPath)
-          if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
-            FileFactory.mkdirs(metadataDirectoryPath, fileType)
-          }
-        } else {
-          carbonLoadModel.setSegmentId(System.currentTimeMillis().toString)
-        }
-        val partitionStatus = SegmentStatus.SUCCESS
-        val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
-        if (carbonLoadModel.getUseOnePass) {
-          loadDataUsingOnePass(
-            sparkSession,
-            carbonProperty,
-            carbonLoadModel,
-            columnar,
-            partitionStatus,
-            hadoopConf,
-            operationContext,
-            LOGGER)
-        } else {
-          loadData(
-            sparkSession,
-            carbonLoadModel,
-            columnar,
-            partitionStatus,
-            hadoopConf,
-            operationContext,
-            LOGGER)
-        }
-        val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
-          new LoadTablePostExecutionEvent(
-            table.getCarbonTableIdentifier,
-            carbonLoadModel)
-        OperationListenerBus.getInstance.fireEvent(loadTablePostExecutionEvent, operationContext)
-        if (tableDataMaps.size() > 0) {
-          val buildDataMapPostExecutionEvent = BuildDataMapPostExecutionEvent(sparkSession,
-            table.getAbsoluteTableIdentifier, null, Seq(carbonLoadModel.getSegmentId), false)
-          OperationListenerBus.getInstance()
-            .fireEvent(buildDataMapPostExecutionEvent, dataMapOperationContext)
+    } catch {
+      case CausedBy(ex: NoRetryException) =>
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
         }
-
-      } catch {
-        case CausedBy(ex: NoRetryException) =>
-          // update the load entry in table status file for changing the status to marked for delete
-          if (isUpdateTableStatusRequired) {
-            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
-          }
-          LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
-          throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
-        // In case of event related exception
-        case preEventEx: PreEventException =>
-          LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
-          throw new AnalysisException(preEventEx.getMessage)
-        case ex: Exception =>
-          LOGGER.error(ex)
-          // update the load entry in table status file for changing the status to marked for delete
-          if (isUpdateTableStatusRequired) {
-            CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
-          }
-          Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. Please check the logs")
-          throw ex
-      } finally {
-        releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
-        // Once the data load is successful delete the unwanted partition files
-        try {
-          val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
-                                  table.getDatabaseName + "/" +
-                                  table.getTableName + "/"
-          val fileType = FileFactory.getFileType(partitionLocation)
-          if (FileFactory.isFileExist(partitionLocation, fileType)) {
-            val file = FileFactory.getCarbonFile(partitionLocation, fileType)
-            CarbonUtil.deleteFoldersAndFiles(file)
-          }
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex)
-            Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. " +
-                         "Problem deleting the partition folder")
-            throw ex
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
+        throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
+      // In case of event related exception
+      case preEventEx: PreEventException =>
+        LOGGER.error(s"Dataload failure for $dbName.$tableName", preEventEx)
+        throw new AnalysisException(preEventEx.getMessage)
+      case ex: Exception =>
+        LOGGER.error(ex)
+        // update the load entry in table status file for changing the status to marked for delete
+        if (isUpdateTableStatusRequired) {
+          CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
         }
-
+        throw ex
+    } finally {
+      releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
+      // Once the data load is successful delete the unwanted partition files
+      val partitionLocation = CarbonProperties.getStorePath + "/partition/" +
+                              table.getDatabaseName + "/" +
+                              table.getTableName + "/"
+      val fileType = FileFactory.getFileType(partitionLocation)
+      if (FileFactory.isFileExist(partitionLocation, fileType)) {
+        val file = FileFactory.getCarbonFile(partitionLocation, fileType)
+        CarbonUtil.deleteFoldersAndFiles(file)
       }
-    } catch {
-      case dle: DataLoadingException =>
-        Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
-        throw dle
-      case mce: MalformedCarbonCommandException =>
-        Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
-        throw mce
     }
     Seq.empty
   }
@@ -543,7 +522,7 @@ case class CarbonLoadDataCommand(
         }
       }
     } else {
-      CarbonDataRDDFactory.loadCarbonData(
+      val loadResult = CarbonDataRDDFactory.loadCarbonData(
         sparkSession.sqlContext,
         carbonLoadModel,
         columnar,
@@ -554,10 +533,25 @@ case class CarbonLoadDataCommand(
         loadDataFrame,
         updateModel,
         operationContext)
+      if (loadResult != null) {
+        val info = makeAuditInfo(loadResult)
+        setAuditInfo(info)
+      }
     }
     rows
   }
 
+  private def makeAuditInfo(loadResult: LoadMetadataDetails): Map[String, String] = {
+    if (loadResult != null) {
+      Map(
+        "SegmentId" -> loadResult.getLoadName,
+        "DataSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getDataSize)),
+        "IndexSize" -> Strings.formatSize(java.lang.Long.parseLong(loadResult.getIndexSize)))
+    } else {
+      Map()
+    }
+  }
+
   private def loadData(
       sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
@@ -593,7 +587,7 @@ case class CarbonLoadDataCommand(
         loadDataFrame,
         operationContext, LOGGER)
     } else {
-      CarbonDataRDDFactory.loadCarbonData(
+      val loadResult = CarbonDataRDDFactory.loadCarbonData(
         sparkSession.sqlContext,
         carbonLoadModel,
         columnar,
@@ -604,6 +598,8 @@ case class CarbonLoadDataCommand(
         loadDataFrame,
         updateModel,
         operationContext)
+      val info = makeAuditInfo(loadResult)
+      setAuditInfo(info)
     }
     rows
   }
@@ -1132,4 +1128,11 @@ case class CarbonLoadDataCommand(
     (dataFrameWithTupleId)
   }
 
+  override protected def opName: String = {
+    if (isOverwriteTable) {
+      "LOAD DATA OVERWRITE"
+    } else {
+      "LOAD DATA"
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index 3f68cc4..4a35e6e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -59,6 +59,7 @@ case class CarbonShowLoadsCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    setAuditTable(carbonTable)
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
@@ -68,4 +69,6 @@ case class CarbonShowLoadsCommand(
       showHistory
     )
   }
+
+  override protected def opName: String = "SHOW SEGMENTS"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 50b88c8..b35c285 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, Me
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -54,6 +53,7 @@ case class RefreshCarbonTableCommand(
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val databaseName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    setAuditTable(databaseName, tableName)
     // Steps
     // 1. get table path
     // 2. perform the below steps
@@ -92,7 +92,6 @@ case class RefreshCarbonTableCommand(
             val msg = s"Table registration with Database name [$databaseName] and Table name " +
                       s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
                       s" not copied under database [$databaseName]"
-            Audit.log(LOGGER, msg)
             throwMetadataException(databaseName, tableName, msg)
           }
           // 2.2.1 Register the aggregate tables to hive
@@ -104,18 +103,7 @@ case class RefreshCarbonTableCommand(
             tableInfo.getFactTable.getPartitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
           registerAllPartitionsToHive(identifier, sparkSession)
         }
-      } else {
-        Audit.log(LOGGER,
-          s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
-          s"failed." +
-          s"Table [$tableName] either non carbon table or stale carbon table under database " +
-          s"[$databaseName]")
       }
-    } else {
-      Audit.log(LOGGER,
-        s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
-        s"failed." +
-        s"Table [$tableName] either already exists or registered under database [$databaseName]")
     }
     // update the schema modified time
     metaStore.updateAndTouchSchemasUpdatedTime()
@@ -185,8 +173,6 @@ case class RefreshCarbonTableCommand(
       OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
       CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath))
         .run(sparkSession)
-      Audit.log(LOGGER, s"Table registration with Database name [$dbName] and Table name " +
-                   s"[$tableName] is successful.")
     } catch {
       case e: AnalysisException => throw e
       case e: Exception =>
@@ -288,4 +274,6 @@ case class RefreshCarbonTableCommand(
       AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession)
     }
   }
+
+  override protected def opName: String = "REFRESH TABLE"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 053937b..70a4350 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -21,14 +21,11 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
@@ -48,6 +45,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    setAuditTable(carbonTable)
+    setAuditInfo(Map("plan" -> plan.simpleString))
     if (!carbonTable.getTableInfo.isTransactionalTable) {
       throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
@@ -81,8 +80,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
     var lockStatus = false
     try {
       lockStatus = metadataLock.lockWithRetries()
-      Audit.log(LOGGER, s" Delete data request has been received " +
-                   s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
       if (lockStatus) {
         LOGGER.info("Successfully able to get the table metadata file lock")
       } else {
@@ -140,4 +137,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
     }
     Seq.empty
   }
+
+  override protected def opName: String = "DELETE DATA"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 31e1779..0f23081 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -60,6 +60,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+    setAuditTable(carbonTable)
+    setAuditInfo(Map("plan" -> plan.simpleString))
     columns.foreach { col =>
       val dataType = carbonTable.getColumnByName(tableName, col).getColumnSchema.getDataType
       if (dataType.isComplexType) {
@@ -276,4 +278,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
     Seq.empty
 
   }
+
+  override protected def opName: String = "UPDATE DATA"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index d118539..0f68004 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -32,8 +32,6 @@ import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.SparkSQLUtil
 
-import org.apache.carbondata.api.CarbonStore.LOGGER
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -166,7 +164,6 @@ object DeleteExecution {
           } else {
             // In case of failure , clean all related delete delta files
             CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-            Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
             val errorMsg =
               "Delete data operation is failed due to failure in creating delete delta file for " +
               "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
@@ -201,19 +198,14 @@ object DeleteExecution {
               listOfSegmentToBeMarkedDeleted)
       ) {
         LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
-        Audit.log(LOGGER, s"Delete data operation is successful for ${ database }.${ tableName }")
-      }
-      else {
+      } else {
         // In case of failure , clean all related delete delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-
         val errorMessage = "Delete data operation is failed due to failure " +
                            "in table status updation."
-        Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
         LOGGER.error("Delete data operation is failed due to failure in table status updation.")
         executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
         executorErrors.errorMsg = errorMessage
-        // throw new Exception(errorMessage)
       }
     }
 
@@ -290,12 +282,10 @@ object DeleteExecution {
           deleteStatus = SegmentStatus.SUCCESS
         } catch {
           case e : MultipleMatchingException =>
-            Audit.log(LOGGER, e.getMessage)
             LOGGER.error(e.getMessage)
           // dont throw exception here.
           case e: Exception =>
             val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
-            Audit.log(LOGGER, errorMsg)
             LOGGER.error(errorMsg + e.getMessage)
             throw e
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 3472d8a..6224d0d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact
 import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -130,7 +129,6 @@ object HorizontalCompaction {
     }
 
     LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].")
-    Audit.log(LOG, s"Horizontal Update Compaction operation started for [$db.$table].")
 
     try {
       // Update Compaction.
@@ -154,7 +152,6 @@ object HorizontalCompaction {
           s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
     }
     LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
-    Audit.log(LOG, s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
   }
 
   /**
@@ -180,7 +177,6 @@ object HorizontalCompaction {
     }
 
     LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].")
-    Audit.log(LOG, s"Horizontal Delete Compaction operation started for [$db.$table].")
 
     try {
 
@@ -225,7 +221,6 @@ object HorizontalCompaction {
         timestamp.toString,
         segmentUpdateStatusManager)
       if (updateStatus == false) {
-        Audit.log(LOG, s"Delete Compaction data operation is failed for [$db.$table].")
         LOG.error("Delete Compaction data operation is failed.")
         throw new HorizontalCompactionException(
           s"Horizontal Delete Compaction Failed for [$db.$table] ." +
@@ -233,7 +228,6 @@ object HorizontalCompaction {
       }
       else {
         LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].")
-        Audit.log(LOG, s"Horizontal Delete Compaction operation completed for [$db.$table].")
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
index 4224efa..f7f76b9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.spark.sql._
@@ -24,6 +25,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.processing.util.Auditor
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 object Checker {
@@ -61,20 +64,71 @@ trait DataProcessOperation {
 }
 
 /**
+ * An utility that run the command with audit log
+ */
+trait Auditable {
+  // operation id that will be written in audit log
+  private val operationId: String = String.valueOf(System.nanoTime())
+
+  // extra info to be written in audit log, set by subclass of AtomicRunnableCommand
+  var auditInfo: Map[String, String] = _
+
+  // holds the dbName and tableName for which this command is executed for
+  // used for audit log, set by subclass of AtomicRunnableCommand
+  private var table: String = _
+
+  // implement by subclass, return the operation name that record in audit log
+  protected def opName: String
+
+  protected def opTime(startTime: Long) = s"${System.currentTimeMillis() - startTime} ms"
+
+  protected def setAuditTable(dbName: String, tableName: String): Unit =
+    table = s"$dbName.$tableName"
+
+  protected def setAuditTable(carbonTable: CarbonTable): Unit =
+    table = s"${carbonTable.getDatabaseName}.${carbonTable.getTableName}"
+
+  protected def setAuditInfo(map: Map[String, String]): Unit = auditInfo = map
+
+  /**
+   * Run the passed command and record the audit log.
+   * Two audit log will be output, one for operation start another for operation success/failure
+   * @param runCmd command to run
+   * @param spark session
+   * @return command result
+   */
+  protected def runWithAudit(runCmd: (SparkSession => Seq[Row]), spark: SparkSession): Seq[Row] = {
+    val start = System.currentTimeMillis()
+    Auditor.logOperationStart(opName, operationId)
+    val rows = try {
+      runCmd(spark)
+    } catch {
+      case e: Throwable =>
+        val map = Map("Exception" -> e.getClass.getName, "Message" -> e.getMessage)
+        Auditor.logOperationEnd(opName, operationId, false, table, opTime(start), map.asJava)
+        throw e
+    }
+    Auditor.logOperationEnd(opName, operationId, true, table, opTime(start),
+      if (auditInfo != null) auditInfo.asJava else new java.util.HashMap[String, String]())
+    rows
+  }
+}
+
+/**
  * Command that modifies metadata(schema, table_status, etc) only without processing data
  */
-abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation {
+abstract class MetadataCommand extends RunnableCommand with MetadataProcessOpeation with Auditable {
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    processMetadata(sparkSession)
+    runWithAudit(processMetadata, sparkSession)
   }
 }
 
 /**
  * Command that process data only without modifying metadata
  */
-abstract class DataCommand extends RunnableCommand with DataProcessOperation {
+abstract class DataCommand extends RunnableCommand with DataProcessOperation with Auditable {
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
+    runWithAudit(processData, sparkSession)
   }
 }
 
@@ -84,17 +138,19 @@ abstract class DataCommand extends RunnableCommand with DataProcessOperation {
  * if process data failed.
  */
 abstract class AtomicRunnableCommand
-  extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation {
+  extends RunnableCommand with MetadataProcessOpeation with DataProcessOperation with Auditable {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    processMetadata(sparkSession)
-    try {
-      processData(sparkSession)
-    } catch {
-      case e: Exception =>
-        undoMetadata(sparkSession, e)
-        throw e
-    }
+    runWithAudit(spark => {
+      processMetadata(spark)
+      try {
+        processData(spark)
+      } catch {
+        case e: Exception =>
+          undoMetadata(spark, e)
+          throw e
+      }
+    }, sparkSession)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 6c8b0b0..dcaac98 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -55,6 +55,8 @@ case class CarbonAlterTableAddHivePartitionCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    setAuditTable(table)
+    setAuditInfo(Map("partition" -> partitionSpecsAndLocs.mkString(", ")))
     if (table.isHivePartitionTable) {
       if (table.isChildDataMap) {
         throw new UnsupportedOperationException("Cannot add partition directly on aggregate tables")
@@ -156,4 +158,5 @@ case class CarbonAlterTableAddHivePartitionCommand(
     Seq.empty[Row]
   }
 
+  override protected def opName: String = "ADD HIVE PARTITION"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 1e987b0..a4629f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -68,6 +68,8 @@ case class CarbonAlterTableDropHivePartitionCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    setAuditTable(table)
+    setAuditInfo(Map("partition" -> specs.mkString(",")))
     if (table.isHivePartitionTable) {
       var locks = List.empty[ICarbonLock]
       try {
@@ -204,4 +206,5 @@ case class CarbonAlterTableDropHivePartitionCommand(
     Seq.empty[Row]
   }
 
+  override protected def opName: String = "DROP HIVE PARTITION"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index c230322..d4a2d7f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
@@ -58,6 +57,8 @@ case class CarbonAlterTableDropPartitionCommand(
     }
     val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
     val tableName = model.tableName
+    setAuditTable(dbName, tableName)
+    setAuditInfo(Map("partition" -> model.partitionId))
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
@@ -169,7 +170,6 @@ case class CarbonAlterTableDropPartitionCommand(
       LOGGER.info("Locks released after alter table drop partition action.")
     }
     LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
-    Audit.log(LOGGER, s"Alter table drop partition is successful for table $dbName.$tableName")
     Seq.empty
   }
 
@@ -178,8 +178,6 @@ case class CarbonAlterTableDropPartitionCommand(
       carbonLoadModel: CarbonLoadModel,
       dropWithData: Boolean,
       oldPartitionIds: List[Int]): Unit = {
-    Audit.log(LOGGER, s"Drop partition request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     try {
       startDropThreads(
         sqlContext,
@@ -237,6 +235,8 @@ case class CarbonAlterTableDropPartitionCommand(
       }
     }
   }
+
+  override protected def opName: String = "DROP CUSTOM PARTITION"
 }
 
 case class dropPartitionThread(sqlContext: SQLContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 8b337c6..18c47a9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -62,6 +61,8 @@ case class CarbonAlterTableSplitPartitionCommand(
     val dbName = splitPartitionModel.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
     val carbonMetaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val tableName = splitPartitionModel.tableName
+    setAuditTable(dbName, tableName)
+    setAuditInfo(Map("partition" -> splitPartitionModel.partitionId))
     val relation = carbonMetaStore.lookupRelation(Option(dbName), tableName)(sparkSession)
       .asInstanceOf[CarbonRelation]
     val tablePath = relation.carbonTable.getTablePath
@@ -187,8 +188,6 @@ case class CarbonAlterTableSplitPartitionCommand(
       LOGGER.info("Locks released after alter table add/split partition action.")
       if (success) {
         LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
-        Audit.log(LOGGER,
-          s"Alter table add/split partition is successful for table $dbName.$tableName")
       }
     }
     Seq.empty
@@ -200,8 +199,6 @@ case class CarbonAlterTableSplitPartitionCommand(
       carbonLoadModel: CarbonLoadModel,
       oldPartitionIdList: List[Int]
   ): Unit = {
-    Audit.log(LOGGER, s"Add partition request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     try {
       startSplitThreads(sqlContext,
         carbonLoadModel,
@@ -257,6 +254,8 @@ case class CarbonAlterTableSplitPartitionCommand(
       }
     }
   }
+
+  override protected def opName: String = "SPLIT CUSTOM PARTITION"
 }
 
 case class SplitThread(sqlContext: SQLContext,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
index 9419b00..2915981 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
@@ -39,6 +39,7 @@ private[sql] case class CarbonShowCarbonPartitionsCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
     val carbonTable = relation.carbonTable
+    setAuditTable(carbonTable)
     val partitionInfo = carbonTable.getPartitionInfo(
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
     if (partitionInfo == null) {
@@ -51,4 +52,6 @@ private[sql] case class CarbonShowCarbonPartitionsCommand(
     LOGGER.info("partition column name:" + columnName)
     CommonUtil.getPartitionInfo(columnName, partitionType, partitionInfo)
   }
+
+  override protected def opName: String = "SHOW CUSTOM PARTITION"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index 1f1e7bd..719ed4a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -44,7 +43,7 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
     val tableName = alterTableAddColumnsModel.tableName
     val dbName = alterTableAddColumnsModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
-    Audit.log(LOGGER, s"Alter table add columns request has been received for $dbName.$tableName")
+    setAuditTable(dbName, tableName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     var timeStamp = 0L
@@ -82,6 +81,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
         wrapperTableInfo,
         carbonTable.getAbsoluteTableIdentifier,
         sparkSession.sparkContext).process
+      setAuditInfo(Map(
+        "newColumn" -> newCols.map(x => s"${x.getColumnName}:${x.getDataType}").mkString(",")))
       // generate dictionary files for the newly added columns
       new AlterTableAddColumnRDD(sparkSession,
         newCols,
@@ -105,10 +106,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
           carbonTable, alterTableAddColumnsModel)
       OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
       LOGGER.info(s"Alter table for add columns is successful for table $dbName.$tableName")
-      Audit.log(LOGGER, s"Alter table for add columns is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table add columns failed", e)
         if (newCols.nonEmpty) {
           LOGGER.info("Cleaning up the dictionary files as alter table add operation failed")
           new AlterTableDropColumnRDD(sparkSession,
@@ -124,4 +123,6 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
     }
     Seq.empty
   }
+
+  override protected def opName: String = "ALTER TABLE ADD COLUMN"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 716b9c9..2bcd3aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -45,8 +44,10 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
     val tableName = alterTableDataTypeChangeModel.tableName
     val dbName = alterTableDataTypeChangeModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
-    Audit.log(LOGGER,
-      s"Alter table change data type request has been received for $dbName.$tableName")
+    setAuditTable(dbName, tableName)
+    setAuditInfo(Map(
+      "column" -> alterTableDataTypeChangeModel.columnName,
+      "newType" -> alterTableDataTypeChangeModel.dataTypeInfo.dataType))
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
     var locks = List.empty[ICarbonLock]
     // get the latest carbon table and check for column existence
@@ -70,16 +71,12 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       val columnName = alterTableDataTypeChangeModel.columnName
       val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
-        Audit.log(LOGGER, s"Alter table change data type request has failed. " +
-                     s"Column $columnName does not exist")
         throwMetadataException(dbName, tableName, s"Column does not exist: $columnName")
       }
       val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
       if (carbonColumn.size == 1) {
         validateColumnDataType(alterTableDataTypeChangeModel.dataTypeInfo, carbonColumn.head)
       } else {
-        Audit.log(LOGGER, s"Alter table change data type request has failed. " +
-                     s"Column $columnName is invalid")
         throwMetadataException(dbName, tableName, s"Invalid Column: $columnName")
       }
       // read the latest schema file
@@ -118,11 +115,8 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
           alterTableDataTypeChangeModel)
       OperationListenerBus.getInstance.fireEvent(alterTablePostExecutionEvent, operationContext)
       LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
-      Audit.log(LOGGER,
-        s"Alter table for data type change is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table change datatype failed : " + e.getMessage)
         if (carbonTable != null) {
           AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
         }
@@ -181,4 +175,6 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
                   s"Only Int and Decimal data types are allowed for modification")
     }
   }
+
+  override protected def opName: String = "ALTER TABLE CHANGE DATA TYPE"
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa274772/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index d601ed6..ccf9e54 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -46,7 +45,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
     val tableName = alterTableDropColumnModel.tableName
     val dbName = alterTableDropColumnModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
-    Audit.log(LOGGER, s"Alter table drop columns request has been received for $dbName.$tableName")
+    setAuditTable(dbName, tableName)
+    setAuditInfo(Map("column" -> alterTableDropColumnModel.columns.mkString(", ")))
     var locks = List.empty[ICarbonLock]
     var timeStamp = 0L
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
@@ -161,10 +161,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       OperationListenerBus.getInstance().fireEvent(alterTableDropColumnPostEvent, operationContext)
 
       LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
-      Audit.log(LOGGER, s"Alter table for drop columns is successful for table $dbName.$tableName")
     } catch {
       case e: Exception =>
-        LOGGER.error("Alter table drop columns failed : " + e.getMessage)
         if (carbonTable != null) {
           AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
         }
@@ -176,4 +174,6 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
     }
     Seq.empty
   }
+
+  override protected def opName: String = "ALTER TABLE DROP COLUMN"
 }