You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/10/18 01:56:57 UTC

[3/6] carbondata git commit: [CARBONDATA-3024] Refactor to use log4j Logger directly

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 57b2e44..3f0eb71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -28,7 +28,9 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -335,7 +337,7 @@ object StreamHandoffRDD {
     } catch {
       case ex: Exception =>
         loadStatus = SegmentStatus.LOAD_FAILURE
-        LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
+        LOGGER.error(s"Handoff failed on streaming segment $handoffSegmenId", ex)
         errorMessage = errorMessage + ": " + ex.getCause.getMessage
         LOGGER.error(errorMessage)
     }
@@ -345,7 +347,7 @@ object StreamHandoffRDD {
       LOGGER.info("********starting clean up**********")
       CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
       LOGGER.info("********clean up done**********")
-      LOGGER.audit(s"Handoff is failed for " +
+      Audit.log(LOGGER, s"Handoff is failed for " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.error("Cannot write load metadata file as handoff failed")
       throw new Exception(errorMessage)
@@ -367,7 +369,7 @@ object StreamHandoffRDD {
         .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       if (!done) {
         val errorMessage = "Handoff failed due to failure in table status updation."
-        LOGGER.audit("Handoff is failed for " +
+        Audit.log(LOGGER, "Handoff is failed for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error("Handoff failed due to failure in table status updation.")
         throw new Exception(errorMessage)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 2cc2a5b..1e8d148 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
 import scala.util.Try
 
 import com.univocity.parsers.common.TextParsingException
+import org.apache.log4j.Logger
 import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
@@ -363,7 +364,7 @@ object CarbonScalaUtil {
   /**
    * Retrieve error message from exception
    */
-  def retrieveAndLogErrorMsg(ex: Throwable, logger: LogService): (String, String) = {
+  def retrieveAndLogErrorMsg(ex: Throwable, logger: Logger): (String, String) = {
     var errorMessage = "DataLoad failure"
     var executorMessage = ""
     if (ex != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 67c4c9b..704382f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -696,17 +696,17 @@ object GlobalDictionaryUtil {
     } catch {
       case ex: Exception =>
         if (ex.getCause != null && ex.getCause.isInstanceOf[NoRetryException]) {
-          LOGGER.error(ex.getCause, "generate global dictionary failed")
+          LOGGER.error("generate global dictionary failed", ex.getCause)
           throw new Exception("generate global dictionary failed, " +
                               ex.getCause.getMessage)
         }
         ex match {
           case spx: SparkException =>
-            LOGGER.error(spx, "generate global dictionary failed")
+            LOGGER.error("generate global dictionary failed", spx)
             throw new Exception("generate global dictionary failed, " +
                                 trimErrorMessage(spx.getMessage))
           case _ =>
-            LOGGER.error(ex, "generate global dictionary failed")
+            LOGGER.error("generate global dictionary failed", ex)
             throw ex
         }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 5e0fe8b..e1dd0af 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,7 +29,9 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.util.CarbonException
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -280,7 +282,7 @@ class AlterTableColumnSchemaGenerator(
       .foreach(f => if (f._2.size > 1) {
         val name = f._1
         LOGGER.error(s"Duplicate column found with name: $name")
-        LOGGER.audit(
+        Audit.log(LOGGER,
           s"Validation failed for Create/Alter Table Operation " +
           s"for ${ dbName }.${ alterTableModel.tableName }. " +
           s"Duplicate column found with name: $name")
@@ -289,7 +291,7 @@ class AlterTableColumnSchemaGenerator(
 
     if (newCols.exists(_.getDataType.isComplexType)) {
       LOGGER.error(s"Complex column cannot be added")
-      LOGGER.audit(
+      Audit.log(LOGGER,
         s"Validation failed for Create/Alter Table Operation " +
         s"for ${ dbName }.${ alterTableModel.tableName }. " +
         s"Complex column cannot be added")
@@ -780,7 +782,7 @@ class TableNewProcessor(cm: TableModel) {
       if (f._2.size > 1) {
         val name = f._1
         LOGGER.error(s"Duplicate column found with name: $name")
-        LOGGER.audit(
+        Audit.log(LOGGER,
           s"Validation failed for Create/Alter Table Operation " +
           s"Duplicate column found with name: $name")
         CarbonException.analysisException(s"Duplicate dimensions found with name: $name")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 6d93b34..2fdbc86 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -283,7 +283,7 @@ object CarbonAppendableStreamSink {
         case t: Throwable =>
           val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
           StreamSegment.recoverSegmentIfRequired(segmentDir)
-          LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
+          LOGGER.error(s"Aborting job ${ job.getJobID }.", t)
           committer.abortJob(job)
           throw new CarbonStreamException("Job failed to write data file", t)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index e5552db..87106e0 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -24,7 +24,9 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.io.IOUtils
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.HdfsFileLock
 import org.apache.carbondata.core.util.CarbonUtil
@@ -48,7 +50,7 @@ object ResourceRegisterAndCopier {
     if (!file.exists()) {
       sys.error(s"""Provided path $hdfsPath does not exist""")
     }
-    LOGGER.audit("Try downloading resource data")
+    Audit.log(LOGGER, "Try downloading resource data")
     val lock = new HdfsFileLock(hdfsPath, "/resource.lock")
     var bool = false
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 6acf31f..5121027 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -23,7 +23,6 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
 import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
 
-import org.apache.parquet.column.Encoding;
 import org.apache.spark.sql.CarbonVectorProxy;
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
 import org.apache.spark.sql.types.Decimal;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 67ea497..779c62f 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.common.logging.LogService;
+import org.apache.log4j.Logger;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -65,7 +65,7 @@ import org.apache.spark.sql.types.StructType;
  */
 public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
 
-  private static final LogService LOGGER =
+  private static final Logger LOGGER =
       LogServiceFactory.getLogService(VectorizedCarbonRecordReader.class.getName());
 
   private int batchIdx = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index e3fec10..3f486d0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -403,7 +403,7 @@ class IndexDataMapRebuildRDD[K, V](
             reader.close()
           } catch {
             case ex: Throwable =>
-              LOGGER.error(ex, "Failed to close reader")
+              LOGGER.error("Failed to close reader", ex)
           }
         }
 
@@ -412,7 +412,7 @@ class IndexDataMapRebuildRDD[K, V](
             refresher.close()
           } catch {
             case ex: Throwable =>
-              LOGGER.error(ex, "Failed to close index writer")
+              LOGGER.error("Failed to close index writer", ex)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 82bae8e..a0bdd64 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command.CompactionModel
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -130,8 +132,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
                                   carbonLoadModel.getTableName)
         LOGGER
           .info(s"Compaction request for datamap ${ carbonTable.getTableUniqueName } is successful")
-        LOGGER
-          .audit(s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful")
+        Audit.log(LOGGER,
+          s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0ec3bc6..4f42139 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -45,8 +45,10 @@ import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
@@ -130,7 +132,7 @@ object CarbonDataRDDFactory {
           }
       }
     } else {
-      LOGGER.audit("Not able to acquire the system level compaction lock for table " +
+      Audit.log(LOGGER, "Not able to acquire the system level compaction lock for table " +
           s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.error("Not able to acquire the compaction lock for table " +
           s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -307,7 +309,7 @@ object CarbonDataRDDFactory {
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None,
       operationContext: OperationContext): Unit = {
-    LOGGER.audit(s"Data load request has been received for table" +
+    Audit.log(LOGGER, s"Data load request has been received for table" +
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     // Check if any load need to be deleted before loading new data
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -449,7 +451,7 @@ object CarbonDataRDDFactory {
         // this means that the update doesnt have any records to update so no need to do table
         // status file updation.
         if (resultSize == 0) {
-          LOGGER.audit("Data update is successful with 0 rows updation for " +
+          Audit.log(LOGGER, "Data update is successful with 0 rows updation for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
           return
         }
@@ -460,11 +462,11 @@ object CarbonDataRDDFactory {
           true,
           new util.ArrayList[Segment](0),
           new util.ArrayList[Segment](segmentFiles), "")) {
-          LOGGER.audit("Data update is successful for " +
+          Audit.log(LOGGER, "Data update is successful for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         } else {
           val errorMessage = "Data update failed due to failure in table status updation."
-          LOGGER.audit("Data update is failed for " +
+          Audit.log(LOGGER, "Data update is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
           LOGGER.error("Data update failed due to failure in table status updation.")
           updateModel.get.executorErrors.errorMsg = errorMessage
@@ -486,7 +488,7 @@ object CarbonDataRDDFactory {
         clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
       }
       LOGGER.info("********clean up done**********")
-      LOGGER.audit(s"Data load is failed for " +
+      Audit.log(LOGGER, s"Data load is failed for " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       LOGGER.warn("Cannot write load metadata file as data load failed")
       throw new Exception(errorMessage)
@@ -505,7 +507,7 @@ object CarbonDataRDDFactory {
           clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
-        LOGGER.audit(s"Data load is failed for " +
+        Audit.log(LOGGER, s"Data load is failed for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         throw new Exception(status(0)._2._2.errorMsg)
       }
@@ -557,7 +559,7 @@ object CarbonDataRDDFactory {
         true
       } catch {
         case ex: Exception =>
-          LOGGER.error(ex, "Problem while committing data maps")
+          LOGGER.error("Problem while committing data maps", ex)
           false
       }
       if (!done || !commitComplete) {
@@ -573,16 +575,16 @@ object CarbonDataRDDFactory {
           clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
         }
         LOGGER.info("********clean up done**********")
-        LOGGER.audit("Data load is failed for " +
+        Audit.log(LOGGER, "Data load is failed for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error("Data load failed due to failure in table status updation.")
         throw new Exception("Data load failed due to failure in table status updation.")
       }
       if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
-        LOGGER.audit("Data load is partially successful for " +
+        Audit.log(LOGGER, "Data load is partially successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       } else {
-        LOGGER.audit("Data load is successful for " +
+        Audit.log(LOGGER, "Data load is successful for " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
       try {
@@ -843,7 +845,7 @@ object CarbonDataRDDFactory {
                 s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
     if (!carbonTable.isChildDataMap &&
         CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
-      LOGGER.audit(s"Compaction request received for table " +
+      Audit.log(LOGGER, s"Compaction request received for table " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       val compactionSize = 0
       val isCompactionTriggerByDDl = false
@@ -903,7 +905,7 @@ object CarbonDataRDDFactory {
               throw e
           }
         } else {
-          LOGGER.audit("Not able to acquire the compaction lock for table " +
+          Audit.log(LOGGER, "Not able to acquire the compaction lock for table " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
           LOGGER.error("Not able to acquire the compaction lock for table " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
@@ -946,7 +948,7 @@ object CarbonDataRDDFactory {
     if (!done) {
       val errorMessage = s"Dataload failed due to failure in table status updation for" +
                          s" ${carbonLoadModel.getTableName}"
-      LOGGER.audit("Data load is failed for " +
+      Audit.log(LOGGER, "Data load is failed for " +
                    s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
       LOGGER.error("Dataload failed due to failure in table status updation.")
       throw new Exception(errorMessage)
@@ -1087,7 +1089,7 @@ object CarbonDataRDDFactory {
       ).collect()
     } catch {
       case ex: Exception =>
-        LOGGER.error(ex, "load data failed for partition table")
+        LOGGER.error("load data failed for partition table", ex)
         throw ex
     }
   }
@@ -1120,7 +1122,7 @@ object CarbonDataRDDFactory {
       ).collect()
     } catch {
       case ex: Exception =>
-        LOGGER.error(ex, "load data frame failed")
+        LOGGER.error("load data frame failed", ex)
         throw ex
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index c505bbc..756d30c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -27,6 +27,8 @@ import scala.collection.mutable
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.metadata.SegmentFileStore
@@ -68,7 +70,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
       } catch {
         case e: Exception =>
-          LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
+          LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)
           throw e
       }
 
@@ -302,7 +304,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       // true because compaction for all datamaps will be finished at a time to the maximum level
       // possible (level 1, 2 etc). so we need to check for either condition
       if (!statusFileUpdation || !commitComplete) {
-        LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+        Audit.log(LOGGER,
+          s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
@@ -310,13 +313,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
                             s" ${ carbonLoadModel.getDatabaseName }." +
                             s"${ carbonLoadModel.getTableName }")
       } else {
-        LOGGER.audit(s"Compaction request completed for table " +
+        Audit.log(LOGGER,
+          s"Compaction request completed for table " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.info(s"Compaction request completed for table " +
                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       }
     } else {
-      LOGGER.audit(s"Compaction request failed for table " +
+      Audit.log(LOGGER, s"Compaction request failed for table " +
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
       )
       LOGGER.error(s"Compaction request failed for table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 320cd78..7edc50f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -37,7 +37,7 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
     sqlContext: SQLContext,
     storeLocation: String) {
 
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def executeCompaction(): Unit
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 23323d4..1b9fb44 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -26,9 +26,11 @@ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSour
 import org.apache.spark.sql.streaming.StreamingQuery
 import org.apache.spark.sql.types.{StructField, StructType}
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.NoSuchStreamException
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
@@ -158,7 +160,7 @@ object StreamJobManager {
         StreamJobDesc(job, streamName, sourceTable.getDatabaseName, sourceTable.getTableName,
           sinkTable.getDatabaseName, sinkTable.getTableName, query, thread))
 
-      LOGGER.audit(s"STREAM $streamName started with job id '${job.id.toString}', " +
+      Audit.log(LOGGER, s"STREAM $streamName started with job id '${job.id.toString}', " +
                    s"from ${sourceTable.getDatabaseName}.${sourceTable.getTableName} " +
                    s"to ${sinkTable.getDatabaseName}.${sinkTable.getTableName}")
       job.id.toString
@@ -179,7 +181,8 @@ object StreamJobManager {
       jobDesc.streamingQuery.stop()
       jobDesc.thread.interrupt()
       jobs.remove(streamName)
-      LOGGER.audit(s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " +
+      Audit.log(LOGGER,
+        s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " +
                    s"from ${jobDesc.sourceDb}.${jobDesc.sourceTable} " +
                    s"to ${jobDesc.sinkDb}.${jobDesc.sinkTable}")
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 838b28d..7eb6e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -170,7 +170,7 @@ class CarbonSession(@transient val sc: SparkContext,
    */
   private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = {
     val analyzed = qe.analyzed
-    val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val LOG = LogServiceFactory.getLogService(this.getClass.getName)
     analyzed match {
       case _@Project(columns, _@Filter(expr, s: SubqueryAlias))
         if s.child.isInstanceOf[LogicalRelation] &&

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
index 1a76ed7..2d4fe84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
@@ -23,7 +23,8 @@ import scala.collection.mutable.ListBuffer
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -31,12 +32,12 @@ import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
 import org.apache.carbondata.events._
 
 class MergeBloomIndexEventListener extends OperationEventListener with Logging {
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     event match {
       case datamapPostEvent: BuildDataMapPostExecutionEvent =>
-        LOGGER.audit("Load post status event-listener called for merge bloom index")
+        Audit.log(LOGGER, "Load post status event-listener called for merge bloom index")
         val carbonTableIdentifier = datamapPostEvent.identifier
         val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
         val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index a0c19e9..639a0e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -23,11 +23,11 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.SparkContext
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.util.CarbonException
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -40,12 +40,12 @@ import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 import org.apache.carbondata.spark.util.CommonUtil
 
 class MergeIndexEventListener extends OperationEventListener with Logging {
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     event match {
       case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
-        LOGGER.audit("Load post status event-listener called for merge index")
+        Audit.log(LOGGER, "Load post status event-listener called for merge index")
         val loadModel = preStatusUpdateEvent.getCarbonLoadModel
         val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
         val compactedSegments = loadModel.getMergedSegmentIds
@@ -71,7 +71,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
           }
         }
       case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
-        LOGGER.audit("Merge index for compaction called")
+        Audit.log(LOGGER, "Merge index for compaction called")
         val carbonTable = alterTableCompactionPostEvent.carbonTable
         val mergedLoads = alterTableCompactionPostEvent.compactedLoads
         val sparkSession = alterTableCompactionPostEvent.sparkSession
@@ -79,11 +79,10 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
           mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads)
         }
       case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
-        val alterTableModel = alterTableMergeIndexEvent.alterTableModel
         val carbonMainTable = alterTableMergeIndexEvent.carbonTable
         val sparkSession = alterTableMergeIndexEvent.sparkSession
         if (!carbonMainTable.isStreamingSink) {
-          LOGGER.audit(s"Compaction request received for table " +
+          Audit.log(LOGGER, s"Compaction request received for table " +
                        s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
           LOGGER.info(s"Merge Index request received for table " +
                       s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
@@ -129,7 +128,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               clearBlockDataMapCache(carbonMainTable, validSegmentIds)
               val requestMessage = "Compaction request completed for table " +
                 s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
-              LOGGER.audit(requestMessage)
+              Audit.log(LOGGER, requestMessage)
               LOGGER.info(requestMessage)
             } else {
               val lockMessage = "Not able to acquire the compaction lock for table " +
@@ -138,7 +137,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                                     .getTableName
                                 }"
 
-              LOGGER.audit(lockMessage)
+              Audit.log(LOGGER, lockMessage)
               LOGGER.error(lockMessage)
               CarbonException.analysisException(
                 "Table is already locked for compaction. Please try after some time.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 66f9e47..081482c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,8 +22,10 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion
@@ -151,7 +153,7 @@ case class CarbonCreateDataMapCommand(
         systemFolderLocation, tableIdentifier, dmProviderName)
     OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
       operationContext)
-    LOGGER.audit(s"DataMap $dataMapName successfully added")
+    Audit.log(LOGGER, s"DataMap $dataMapName successfully added")
     Seq.empty
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 4607de0..67e2dee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -52,7 +53,7 @@ case class CarbonDropDataMapCommand(
     forceDrop: Boolean = false)
   extends AtomicRunnableCommand {
 
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   private var dataMapProvider: DataMapProvider = _
   var mainTable: CarbonTable = _
   var dataMapSchema: DataMapSchema = _
@@ -111,7 +112,7 @@ case class CarbonDropDataMapCommand(
         locksToBeAcquired foreach {
           lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
         }
-        LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
+        Audit.log(LOGGER, s"Deleting datamap [$dataMapName] under table [$tableName]")
 
         // drop index,mv datamap on the main table.
         if (mainTable != null &&
@@ -172,7 +173,7 @@ case class CarbonDropDataMapCommand(
         case e: NoSuchDataMapException =>
           throw e
         case ex: Exception =>
-          LOGGER.error(ex, s"Dropping datamap $dataMapName failed")
+          LOGGER.error(s"Dropping datamap $dataMapName failed", ex)
           throwMetadataException(dbName, tableName,
             s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index b699ec1..8e338db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -25,14 +25,15 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CompactionModel}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -63,7 +64,7 @@ case class CarbonAlterTableCompactionCommand(
   var table: CarbonTable = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val tableName = alterTableModel.tableName.toLowerCase
     val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     table = if (tableInfoOp.isDefined) {
@@ -204,7 +205,7 @@ case class CarbonAlterTableCompactionCommand(
       storeLocation: String,
       compactedSegments: java.util.List[String],
       operationContext: OperationContext): Unit = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
@@ -216,7 +217,7 @@ case class CarbonAlterTableCompactionCommand(
       }
     }
 
-    LOGGER.audit(s"Compaction request received for table " +
+    Audit.log(LOGGER, s"Compaction request received for table " +
                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
@@ -313,7 +314,7 @@ case class CarbonAlterTableCompactionCommand(
             throw e
         }
       } else {
-        LOGGER.audit("Not able to acquire the compaction lock for table " +
+        Audit.log(LOGGER, "Not able to acquire the compaction lock for table " +
                      s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.error(s"Not able to acquire the compaction lock for table" +
                      s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -328,7 +329,7 @@ case class CarbonAlterTableCompactionCommand(
       operationContext: OperationContext,
       sparkSession: SparkSession
   ): Unit = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     // 1. delete the lock of streaming.lock, forcing the stream to be closed
     val streamingLock = CarbonLockFactory.getCarbonLockObj(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
index a477167..ba20773 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -33,7 +33,7 @@ case class CarbonAlterTableFinishStreaming(
   extends MetadataCommand {
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val streamingLock = CarbonLockFactory.getCarbonLockObj(
       carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
       LockUsage.STREAMING_LOCK)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index e561a5a..a390191 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -147,7 +147,7 @@ case class CarbonCleanFilesCommand(
       case e: Throwable =>
         // catch all exceptions to avoid failure
         LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-          .error(e, "Failed to clean in progress segments")
+          .error("Failed to clean in progress segments", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 7cf8c1e..ee0f5ab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -36,7 +36,7 @@ case class CarbonInsertIntoCommand(
   var loadCommand: CarbonLoadDataCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     def containsLimit(plan: LogicalPlan): Boolean = {
       plan find {
         case limit: GlobalLimit => true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 43c8b86..22d0bb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
@@ -48,7 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -109,7 +111,7 @@ case class CarbonLoadDataCommand(
   var parentTablePath: String = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     table = if (tableInfoOp.isDefined) {
         CarbonTable.buildFromTableInfo(tableInfoOp.get)
@@ -121,7 +123,7 @@ case class CarbonLoadDataCommand(
         }
         if (null == relation.carbonTable) {
           LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
-          LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+          Audit.log(LOGGER, s"Data loading failed. table not found: $dbName.$tableName")
           throw new NoSuchTableException(dbName, tableName)
         }
         relation.carbonTable
@@ -150,7 +152,7 @@ case class CarbonLoadDataCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     var concurrentLoadLock: Option[ICarbonLock] = None
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -341,7 +343,7 @@ case class CarbonLoadDataCommand(
           if (isUpdateTableStatusRequired) {
             CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
           }
-          LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
+          LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
           throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
         // In case of event related exception
         case preEventEx: PreEventException =>
@@ -352,7 +354,7 @@ case class CarbonLoadDataCommand(
           if (isUpdateTableStatusRequired) {
             CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
           }
-          LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
+          Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. Please check the logs")
           throw ex
       } finally {
         releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
@@ -369,7 +371,7 @@ case class CarbonLoadDataCommand(
         } catch {
           case ex: Exception =>
             LOGGER.error(ex)
-            LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
+            Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. " +
                          "Problem deleting the partition folder")
             throw ex
         }
@@ -377,10 +379,10 @@ case class CarbonLoadDataCommand(
       }
     } catch {
       case dle: DataLoadingException =>
-        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
+        Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
         throw dle
       case mce: MalformedCarbonCommandException =>
-        LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
+        Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
         throw mce
     }
     Seq.empty
@@ -412,7 +414,7 @@ case class CarbonLoadDataCommand(
   }
 
   private def releaseConcurrentLoadLock(concurrentLoadLock: Option[ICarbonLock],
-      LOGGER: LogService): Unit = {
+      LOGGER: Logger): Unit = {
     if (concurrentLoadLock.isDefined) {
       if (concurrentLoadLock.get.unlock()) {
         LOGGER.info("concurrent_load lock for table" + table.getTablePath +
@@ -432,7 +434,7 @@ case class CarbonLoadDataCommand(
       partitionStatus: SegmentStatus,
       hadoopConf: Configuration,
       operationContext: OperationContext,
-      LOGGER: LogService): Seq[Row] = {
+      LOGGER: Logger): Seq[Row] = {
     var rows = Seq.empty[Row]
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
@@ -561,7 +563,7 @@ case class CarbonLoadDataCommand(
       partitionStatus: SegmentStatus,
       hadoopConf: Configuration,
       operationContext: OperationContext,
-      LOGGER: LogService): Seq[Row] = {
+      LOGGER: Logger): Seq[Row] = {
     var rows = Seq.empty[Row]
     val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
       val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
@@ -615,9 +617,8 @@ case class CarbonLoadDataCommand(
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame],
       operationContext: OperationContext,
-      LOGGER: LogService): Seq[Row] = {
+      LOGGER: Logger): Seq[Row] = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
     val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
     var timeStampformatString = carbonLoadModel.getTimestampformat
     if (timeStampformatString.isEmpty) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index cf88fb9..39e85ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -47,8 +48,7 @@ case class RefreshCarbonTableCommand(
     databaseNameOp: Option[String],
     tableName: String)
   extends MetadataCommand {
-  val LOGGER: LogService =
-    LogServiceFactory.getLogService(this.getClass.getName)
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -88,7 +88,7 @@ case class RefreshCarbonTableCommand(
             val msg = s"Table registration with Database name [$databaseName] and Table name " +
                       s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
                       s" not copied under database [$databaseName]"
-            LOGGER.audit(msg)
+            Audit.log(LOGGER, msg)
             throwMetadataException(databaseName, tableName, msg)
           }
           // 2.2.1 Register the aggregate tables to hive
@@ -101,14 +101,14 @@ case class RefreshCarbonTableCommand(
           registerAllPartitionsToHive(identifier, sparkSession)
         }
       } else {
-        LOGGER.audit(
+        Audit.log(LOGGER,
           s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
           s"failed." +
           s"Table [$tableName] either non carbon table or stale carbon table under database " +
           s"[$databaseName]")
       }
     } else {
-      LOGGER.audit(
+      Audit.log(LOGGER,
         s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
         s"failed." +
         s"Table [$tableName] either already exists or registered under database [$databaseName]")
@@ -154,7 +154,7 @@ case class RefreshCarbonTableCommand(
       OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
       CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath))
         .run(sparkSession)
-      LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " +
+      Audit.log(LOGGER, s"Table registration with Database name [$dbName] and Table name " +
                    s"[$tableName] is successful.")
     } catch {
       case e: AnalysisException => throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 0127d7e..053937b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -21,8 +21,10 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 
+import org.apache.carbondata.api.CarbonStore.LOGGER
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -79,7 +81,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
     var lockStatus = false
     try {
       lockStatus = metadataLock.lockWithRetries()
-      LOGGER.audit(s" Delete data request has been received " +
+      Audit.log(LOGGER, s" Delete data request has been received " +
                    s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
       if (lockStatus) {
         LOGGER.info("Successfully able to get the table metadata file lock")
@@ -119,7 +121,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
 
       case e: Exception =>
-        LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
+        LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 4e9c1af..31e1779 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -163,7 +163,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
 
       case e: Exception =>
-        LOGGER.error(e, "Exception in update operation")
+        LOGGER.error("Exception in update operation", e)
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 7e7f671..d118539 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.command.mutation
 import java.util
 
 import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
@@ -34,7 +32,9 @@ import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.SparkSQLUtil
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.api.CarbonStore.LOGGER
+import org.apache.carbondata.common.logging.impl.Audit
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -46,13 +46,12 @@ import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
 
 object DeleteExecution {
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   /**
    * generate the delete delta files in each segment as per the RDD.
@@ -167,7 +166,7 @@ object DeleteExecution {
           } else {
             // In case of failure , clean all related delete delta files
             CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
-            LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+            Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
             val errorMsg =
               "Delete data operation is failed due to failure in creating delete delta file for " +
               "segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
@@ -202,7 +201,7 @@ object DeleteExecution {
               listOfSegmentToBeMarkedDeleted)
       ) {
         LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
-        LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
+        Audit.log(LOGGER, s"Delete data operation is successful for ${ database }.${ tableName }")
       }
       else {
         // In case of failure , clean all related delete delta files
@@ -210,7 +209,7 @@ object DeleteExecution {
 
         val errorMessage = "Delete data operation is failed due to failure " +
                            "in table status updation."
-        LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+        Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
         LOGGER.error("Delete data operation is failed due to failure in table status updation.")
         executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
         executorErrors.errorMsg = errorMessage
@@ -291,12 +290,12 @@ object DeleteExecution {
           deleteStatus = SegmentStatus.SUCCESS
         } catch {
           case e : MultipleMatchingException =>
-            LOGGER.audit(e.getMessage)
+            Audit.log(LOGGER, e.getMessage)
             LOGGER.error(e.getMessage)
           // dont throw exception here.
           case e: Exception =>
             val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
-            LOGGER.audit(errorMsg)
+            Audit.log(LOGGER, errorMsg)
             LOGGER.error(errorMsg + e.getMessage)
             throw e
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 35fc3c3..3472d8a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -25,21 +25,20 @@ import scala.collection.mutable.ListBuffer
 import org.apache.spark.sql._
 import org.apache.spark.sql.execution.command.AlterTableModel
 import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
-import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.util.SparkSQLUtil
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
 
 object HorizontalCompaction {
 
-  val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  val LOG = LogServiceFactory.getLogService(this.getClass.getName)
 
   /**
    * The method does horizontal compaction. After Update and Delete completion
@@ -131,7 +130,7 @@ object HorizontalCompaction {
     }
 
     LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].")
-    LOG.audit(s"Horizontal Update Compaction operation started for [$db.$table].")
+    Audit.log(LOG, s"Horizontal Update Compaction operation started for [$db.$table].")
 
     try {
       // Update Compaction.
@@ -155,7 +154,7 @@ object HorizontalCompaction {
           s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
     }
     LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
-    LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
+    Audit.log(LOG, s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
   }
 
   /**
@@ -181,7 +180,7 @@ object HorizontalCompaction {
     }
 
     LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].")
-    LOG.audit(s"Horizontal Delete Compaction operation started for [$db.$table].")
+    Audit.log(LOG, s"Horizontal Delete Compaction operation started for [$db.$table].")
 
     try {
 
@@ -226,7 +225,7 @@ object HorizontalCompaction {
         timestamp.toString,
         segmentUpdateStatusManager)
       if (updateStatus == false) {
-        LOG.audit(s"Delete Compaction data operation is failed for [$db.$table].")
+        Audit.log(LOG, s"Delete Compaction data operation is failed for [$db.$table].")
         LOG.error("Delete Compaction data operation is failed.")
         throw new HorizontalCompactionException(
           s"Horizontal Delete Compaction Failed for [$db.$table] ." +
@@ -234,7 +233,7 @@ object HorizontalCompaction {
       }
       else {
         LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].")
-        LOG.audit(s"Horizontal Delete Compaction operation completed for [$db.$table].")
+        Audit.log(LOG, s"Horizontal Delete Compaction operation completed for [$db.$table].")
       }
     }
     catch {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index b76a485..c230322 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -28,13 +28,14 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.AlterTableUtil
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -48,7 +49,7 @@ case class CarbonAlterTableDropPartitionCommand(
     model: AlterTableDropPartitionModel)
   extends AtomicRunnableCommand {
 
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
   private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
@@ -121,7 +122,7 @@ case class CarbonAlterTableDropPartitionCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
     val tableName = model.tableName
     var locks = List.empty[ICarbonLock]
@@ -168,7 +169,7 @@ case class CarbonAlterTableDropPartitionCommand(
       LOGGER.info("Locks released after alter table drop partition action.")
     }
     LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
-    LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
+    Audit.log(LOGGER, s"Alter table drop partition is successful for table $dbName.$tableName")
     Seq.empty
   }
 
@@ -177,7 +178,7 @@ case class CarbonAlterTableDropPartitionCommand(
       carbonLoadModel: CarbonLoadModel,
       dropWithData: Boolean,
       oldPartitionIds: List[Int]): Unit = {
-    LOGGER.audit(s"Drop partition request received for table " +
+    Audit.log(LOGGER, s"Drop partition request received for table " +
                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     try {
       startDropThreads(
@@ -246,7 +247,7 @@ case class dropPartitionThread(sqlContext: SQLContext,
     dropWithData: Boolean,
     oldPartitionIds: List[Int]) extends Thread {
 
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def run(): Unit = {
     try {
@@ -254,8 +255,8 @@ case class dropPartitionThread(sqlContext: SQLContext,
         segmentId, partitionId, dropWithData, oldPartitionIds)
     } catch {
       case e: Exception =>
-        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
-        LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+        LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }", e)
     }
   }
 
@@ -274,7 +275,7 @@ case class dropPartitionThread(sqlContext: SQLContext,
       future.get
     } catch {
       case e: Exception =>
-        LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+        LOGGER.error(s"Exception in partition drop thread ${ e.getMessage }", e)
         throw e
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 753abaf..8b337c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -54,7 +55,7 @@ case class CarbonAlterTableSplitPartitionCommand(
     splitPartitionModel: AlterTableSplitPartitionModel)
   extends AtomicRunnableCommand {
 
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
   private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
@@ -182,11 +183,12 @@ case class CarbonAlterTableSplitPartitionCommand(
     } finally {
       AlterTableUtil.releaseLocks(locks)
       CacheProvider.getInstance().dropAllCache()
-      val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+      val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
       LOGGER.info("Locks released after alter table add/split partition action.")
       if (success) {
         LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
-        LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+        Audit.log(LOGGER,
+          s"Alter table add/split partition is successful for table $dbName.$tableName")
       }
     }
     Seq.empty
@@ -198,7 +200,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       carbonLoadModel: CarbonLoadModel,
       oldPartitionIdList: List[Int]
   ): Unit = {
-    LOGGER.audit(s"Add partition request received for table " +
+    Audit.log(LOGGER, s"Add partition request received for table " +
                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     try {
       startSplitThreads(sqlContext,
@@ -264,7 +266,7 @@ case class SplitThread(sqlContext: SQLContext,
     partitionId: String,
     oldPartitionIdList: List[Int]) extends Thread {
 
-  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def run(): Unit = {
     var triggeredSplitPartitionStatus = false
@@ -275,7 +277,7 @@ case class SplitThread(sqlContext: SQLContext,
       triggeredSplitPartitionStatus = true
     } catch {
       case e: Exception =>
-        val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
         LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
         exception = e
     }
@@ -301,7 +303,7 @@ case class SplitThread(sqlContext: SQLContext,
       }
     } catch {
       case e: Exception =>
-        LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
+        LOGGER.error(s"Exception in partition split thread ${ e.getMessage }", e)
         throw e
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index b33652f..f606c04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -64,7 +64,7 @@ object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
 
 trait CommitHelper {
 
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
       operationContext: OperationContext,
@@ -586,7 +586,7 @@ object CommitPreAggregateListener extends OperationEventListener with CommitHelp
     } catch {
       case e: Exception =>
         operationContext.setProperty("commitComplete", false)
-        LOGGER.error(e, "Problem while committing data maps")
+        LOGGER.error("Problem while committing data maps", e)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index f26d1cb..d16f570 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -419,11 +419,10 @@ object PreAggregateUtil {
    */
   def updateMainTable(carbonTable: CarbonTable,
       childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
-    var numberOfCurrentChild: Int = 0
     val dbName = carbonTable.getDatabaseName
     val tableName = carbonTable.getTableName
     try {
@@ -450,7 +449,7 @@ object PreAggregateUtil {
       thriftTableInfo
     } catch {
       case e: Exception =>
-        LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
+        LOGGER.error("Pre Aggregate Parent table update failed reverting changes", e)
         throw e
     } finally {
       // release lock after command execution completion