You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2018/10/18 01:56:57 UTC
[3/6] carbondata git commit: [CARBONDATA-3024] Refactor to use log4j
Logger directly
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 57b2e44..3f0eb71 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -28,7 +28,9 @@ import org.apache.spark.{Partition, SerializableWritable, SparkContext, TaskCont
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.SegmentProperties
@@ -335,7 +337,7 @@ object StreamHandoffRDD {
} catch {
case ex: Exception =>
loadStatus = SegmentStatus.LOAD_FAILURE
- LOGGER.error(ex, s"Handoff failed on streaming segment $handoffSegmenId")
+ LOGGER.error(s"Handoff failed on streaming segment $handoffSegmenId", ex)
errorMessage = errorMessage + ": " + ex.getCause.getMessage
LOGGER.error(errorMessage)
}
@@ -345,7 +347,7 @@ object StreamHandoffRDD {
LOGGER.info("********starting clean up**********")
CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
LOGGER.info("********clean up done**********")
- LOGGER.audit(s"Handoff is failed for " +
+ Audit.log(LOGGER, s"Handoff is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Cannot write load metadata file as handoff failed")
throw new Exception(errorMessage)
@@ -367,7 +369,7 @@ object StreamHandoffRDD {
.fireEvent(loadTablePostStatusUpdateEvent, operationContext)
if (!done) {
val errorMessage = "Handoff failed due to failure in table status updation."
- LOGGER.audit("Handoff is failed for " +
+ Audit.log(LOGGER, "Handoff is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Handoff failed due to failure in table status updation.")
throw new Exception(errorMessage)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 2cc2a5b..1e8d148 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
import scala.util.Try
import com.univocity.parsers.common.TextParsingException
+import org.apache.log4j.Logger
import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
@@ -363,7 +364,7 @@ object CarbonScalaUtil {
/**
* Retrieve error message from exception
*/
- def retrieveAndLogErrorMsg(ex: Throwable, logger: LogService): (String, String) = {
+ def retrieveAndLogErrorMsg(ex: Throwable, logger: Logger): (String, String) = {
var errorMessage = "DataLoad failure"
var executorMessage = ""
if (ex != null) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 67c4c9b..704382f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -696,17 +696,17 @@ object GlobalDictionaryUtil {
} catch {
case ex: Exception =>
if (ex.getCause != null && ex.getCause.isInstanceOf[NoRetryException]) {
- LOGGER.error(ex.getCause, "generate global dictionary failed")
+ LOGGER.error("generate global dictionary failed", ex.getCause)
throw new Exception("generate global dictionary failed, " +
ex.getCause.getMessage)
}
ex match {
case spx: SparkException =>
- LOGGER.error(spx, "generate global dictionary failed")
+ LOGGER.error("generate global dictionary failed", spx)
throw new Exception("generate global dictionary failed, " +
trimErrorMessage(spx.getMessage))
case _ =>
- LOGGER.error(ex, "generate global dictionary failed")
+ LOGGER.error("generate global dictionary failed", ex)
throw ex
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 5e0fe8b..e1dd0af 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,7 +29,9 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.util.CarbonException
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -280,7 +282,7 @@ class AlterTableColumnSchemaGenerator(
.foreach(f => if (f._2.size > 1) {
val name = f._1
LOGGER.error(s"Duplicate column found with name: $name")
- LOGGER.audit(
+ Audit.log(LOGGER,
s"Validation failed for Create/Alter Table Operation " +
s"for ${ dbName }.${ alterTableModel.tableName }. " +
s"Duplicate column found with name: $name")
@@ -289,7 +291,7 @@ class AlterTableColumnSchemaGenerator(
if (newCols.exists(_.getDataType.isComplexType)) {
LOGGER.error(s"Complex column cannot be added")
- LOGGER.audit(
+ Audit.log(LOGGER,
s"Validation failed for Create/Alter Table Operation " +
s"for ${ dbName }.${ alterTableModel.tableName }. " +
s"Complex column cannot be added")
@@ -780,7 +782,7 @@ class TableNewProcessor(cm: TableModel) {
if (f._2.size > 1) {
val name = f._1
LOGGER.error(s"Duplicate column found with name: $name")
- LOGGER.audit(
+ Audit.log(LOGGER,
s"Validation failed for Create/Alter Table Operation " +
s"Duplicate column found with name: $name")
CarbonException.analysisException(s"Duplicate dimensions found with name: $name")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
index 6d93b34..2fdbc86 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala
@@ -283,7 +283,7 @@ object CarbonAppendableStreamSink {
case t: Throwable =>
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
StreamSegment.recoverSegmentIfRequired(segmentDir)
- LOGGER.error(t, s"Aborting job ${ job.getJobID }.")
+ LOGGER.error(s"Aborting job ${ job.getJobID }.", t)
committer.abortJob(job)
throw new CarbonStreamException("Job failed to write data file", t)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
index e5552db..87106e0 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/test/ResourceRegisterAndCopier.scala
@@ -24,7 +24,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.io.IOUtils
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.HdfsFileLock
import org.apache.carbondata.core.util.CarbonUtil
@@ -48,7 +50,7 @@ object ResourceRegisterAndCopier {
if (!file.exists()) {
sys.error(s"""Provided path $hdfsPath does not exist""")
}
- LOGGER.audit("Try downloading resource data")
+ Audit.log(LOGGER, "Try downloading resource data")
val lock = new HdfsFileLock(hdfsPath, "/resource.lock")
var bool = false
try {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
index 6acf31f..5121027 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -23,7 +23,6 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
-import org.apache.parquet.column.Encoding;
import org.apache.spark.sql.CarbonVectorProxy;
import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
import org.apache.spark.sql.types.Decimal;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
index 67ea497..779c62f 100644
--- a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.common.logging.LogService;
+import org.apache.log4j.Logger;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.cache.dictionary.Dictionary;
import org.apache.carbondata.core.datastore.block.TableBlockInfo;
@@ -65,7 +65,7 @@ import org.apache.spark.sql.types.StructType;
*/
public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
- private static final LogService LOGGER =
+ private static final Logger LOGGER =
LogServiceFactory.getLogService(VectorizedCarbonRecordReader.class.getName());
private int batchIdx = 0;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index e3fec10..3f486d0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -403,7 +403,7 @@ class IndexDataMapRebuildRDD[K, V](
reader.close()
} catch {
case ex: Throwable =>
- LOGGER.error(ex, "Failed to close reader")
+ LOGGER.error("Failed to close reader", ex)
}
}
@@ -412,7 +412,7 @@ class IndexDataMapRebuildRDD[K, V](
refresher.close()
} catch {
case ex: Throwable =>
- LOGGER.error(ex, "Failed to close index writer")
+ LOGGER.error("Failed to close index writer", ex)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
index 82bae8e..a0bdd64 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command.CompactionModel
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.carbondata.api.CarbonStore.LOGGER
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -130,8 +132,8 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
carbonLoadModel.getTableName)
LOGGER
.info(s"Compaction request for datamap ${ carbonTable.getTableUniqueName } is successful")
- LOGGER
- .audit(s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful")
+ Audit.log(LOGGER,
+ s"Compaction request for datamap ${carbonTable.getTableUniqueName} is successful")
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0ec3bc6..4f42139 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -45,8 +45,10 @@ import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.{CarbonException, SparkSQLUtil}
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.constants.LoggerAction
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
@@ -130,7 +132,7 @@ object CarbonDataRDDFactory {
}
}
} else {
- LOGGER.audit("Not able to acquire the system level compaction lock for table " +
+ Audit.log(LOGGER, "Not able to acquire the system level compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -307,7 +309,7 @@ object CarbonDataRDDFactory {
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None,
operationContext: OperationContext): Unit = {
- LOGGER.audit(s"Data load request has been received for table" +
+ Audit.log(LOGGER, s"Data load request has been received for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Check if any load need to be deleted before loading new data
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -449,7 +451,7 @@ object CarbonDataRDDFactory {
// this means that the update doesnt have any records to update so no need to do table
// status file updation.
if (resultSize == 0) {
- LOGGER.audit("Data update is successful with 0 rows updation for " +
+ Audit.log(LOGGER, "Data update is successful with 0 rows updation for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
return
}
@@ -460,11 +462,11 @@ object CarbonDataRDDFactory {
true,
new util.ArrayList[Segment](0),
new util.ArrayList[Segment](segmentFiles), "")) {
- LOGGER.audit("Data update is successful for " +
+ Audit.log(LOGGER, "Data update is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
val errorMessage = "Data update failed due to failure in table status updation."
- LOGGER.audit("Data update is failed for " +
+ Audit.log(LOGGER, "Data update is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Data update failed due to failure in table status updation.")
updateModel.get.executorErrors.errorMsg = errorMessage
@@ -486,7 +488,7 @@ object CarbonDataRDDFactory {
clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
- LOGGER.audit(s"Data load is failed for " +
+ Audit.log(LOGGER, s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.warn("Cannot write load metadata file as data load failed")
throw new Exception(errorMessage)
@@ -505,7 +507,7 @@ object CarbonDataRDDFactory {
clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
- LOGGER.audit(s"Data load is failed for " +
+ Audit.log(LOGGER, s"Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
throw new Exception(status(0)._2._2.errorMsg)
}
@@ -557,7 +559,7 @@ object CarbonDataRDDFactory {
true
} catch {
case ex: Exception =>
- LOGGER.error(ex, "Problem while committing data maps")
+ LOGGER.error("Problem while committing data maps", ex)
false
}
if (!done || !commitComplete) {
@@ -573,16 +575,16 @@ object CarbonDataRDDFactory {
clearDataMapFiles(carbonTable, carbonLoadModel.getSegmentId)
}
LOGGER.info("********clean up done**********")
- LOGGER.audit("Data load is failed for " +
+ Audit.log(LOGGER, "Data load is failed for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error("Data load failed due to failure in table status updation.")
throw new Exception("Data load failed due to failure in table status updation.")
}
if (SegmentStatus.LOAD_PARTIAL_SUCCESS == loadStatus) {
- LOGGER.audit("Data load is partially successful for " +
+ Audit.log(LOGGER, "Data load is partially successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
} else {
- LOGGER.audit("Data load is successful for " +
+ Audit.log(LOGGER, "Data load is successful for " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
try {
@@ -843,7 +845,7 @@ object CarbonDataRDDFactory {
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
if (!carbonTable.isChildDataMap &&
CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
- LOGGER.audit(s"Compaction request received for table " +
+ Audit.log(LOGGER, s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val compactionSize = 0
val isCompactionTriggerByDDl = false
@@ -903,7 +905,7 @@ object CarbonDataRDDFactory {
throw e
}
} else {
- LOGGER.audit("Not able to acquire the compaction lock for table " +
+ Audit.log(LOGGER, "Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
LOGGER.error("Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
@@ -946,7 +948,7 @@ object CarbonDataRDDFactory {
if (!done) {
val errorMessage = s"Dataload failed due to failure in table status updation for" +
s" ${carbonLoadModel.getTableName}"
- LOGGER.audit("Data load is failed for " +
+ Audit.log(LOGGER, "Data load is failed for " +
s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
LOGGER.error("Dataload failed due to failure in table status updation.")
throw new Exception(errorMessage)
@@ -1087,7 +1089,7 @@ object CarbonDataRDDFactory {
).collect()
} catch {
case ex: Exception =>
- LOGGER.error(ex, "load data failed for partition table")
+ LOGGER.error("load data failed for partition table", ex)
throw ex
}
}
@@ -1120,7 +1122,7 @@ object CarbonDataRDDFactory {
).collect()
} catch {
case ex: Exception =>
- LOGGER.error(ex, "load data frame failed")
+ LOGGER.error("load data frame failed", ex)
throw ex
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index c505bbc..756d30c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -27,6 +27,8 @@ import scala.collection.mutable
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
+import org.apache.carbondata.api.CarbonStore.LOGGER
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.metadata.SegmentFileStore
@@ -68,7 +70,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
scanSegmentsAndSubmitJob(loadsToMerge, compactedSegments)
} catch {
case e: Exception =>
- LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
+ LOGGER.error(s"Exception in compaction thread ${ e.getMessage }", e)
throw e
}
@@ -302,7 +304,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
// true because compaction for all datamaps will be finished at a time to the maximum level
// possible (level 1, 2 etc). so we need to check for either condition
if (!statusFileUpdation || !commitComplete) {
- LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ Audit.log(LOGGER,
+ s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
@@ -310,13 +313,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
s" ${ carbonLoadModel.getDatabaseName }." +
s"${ carbonLoadModel.getTableName }")
} else {
- LOGGER.audit(s"Compaction request completed for table " +
+ Audit.log(LOGGER,
+ s"Compaction request completed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.info(s"Compaction request completed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
}
} else {
- LOGGER.audit(s"Compaction request failed for table " +
+ Audit.log(LOGGER, s"Compaction request failed for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
)
LOGGER.error(s"Compaction request failed for table " +
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 320cd78..7edc50f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -37,7 +37,7 @@ abstract class Compactor(carbonLoadModel: CarbonLoadModel,
sqlContext: SQLContext,
storeLocation: String) {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def executeCompaction(): Unit
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
index 23323d4..1b9fb44 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/stream/StreamJobManager.scala
@@ -26,9 +26,11 @@ import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSour
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.NoSuchStreamException
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
@@ -158,7 +160,7 @@ object StreamJobManager {
StreamJobDesc(job, streamName, sourceTable.getDatabaseName, sourceTable.getTableName,
sinkTable.getDatabaseName, sinkTable.getTableName, query, thread))
- LOGGER.audit(s"STREAM $streamName started with job id '${job.id.toString}', " +
+ Audit.log(LOGGER, s"STREAM $streamName started with job id '${job.id.toString}', " +
s"from ${sourceTable.getDatabaseName}.${sourceTable.getTableName} " +
s"to ${sinkTable.getDatabaseName}.${sinkTable.getTableName}")
job.id.toString
@@ -179,7 +181,8 @@ object StreamJobManager {
jobDesc.streamingQuery.stop()
jobDesc.thread.interrupt()
jobs.remove(streamName)
- LOGGER.audit(s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " +
+ Audit.log(LOGGER,
+ s"STREAM $streamName stopped, job id '${jobDesc.streamingQuery.id.toString}', " +
s"from ${jobDesc.sourceDb}.${jobDesc.sourceTable} " +
s"to ${jobDesc.sinkDb}.${jobDesc.sinkTable}")
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 838b28d..7eb6e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -170,7 +170,7 @@ class CarbonSession(@transient val sc: SparkContext,
*/
private def trySearchMode(qe: QueryExecution, sse: SQLStart): DataFrame = {
val analyzed = qe.analyzed
- val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOG = LogServiceFactory.getLogService(this.getClass.getName)
analyzed match {
case _@Project(columns, _@Filter(expr, s: SubqueryAlias))
if s.child.isInstanceOf[LogicalRelation] &&
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
index 1a76ed7..2d4fe84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeBloomIndexEventListener.scala
@@ -23,7 +23,8 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -31,12 +32,12 @@ import org.apache.carbondata.datamap.CarbonMergeBloomIndexFilesRDD
import org.apache.carbondata.events._
class MergeBloomIndexEventListener extends OperationEventListener with Logging {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
event match {
case datamapPostEvent: BuildDataMapPostExecutionEvent =>
- LOGGER.audit("Load post status event-listener called for merge bloom index")
+ Audit.log(LOGGER, "Load post status event-listener called for merge bloom index")
val carbonTableIdentifier = datamapPostEvent.identifier
val carbonTable = DataMapStoreManager.getInstance().getCarbonTable(carbonTableIdentifier)
val tableDataMaps = DataMapStoreManager.getInstance().getAllDataMap(carbonTable)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index a0c19e9..639a0e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -23,11 +23,11 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.internal.Logging
-import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.util.CarbonException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
@@ -40,12 +40,12 @@ import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
import org.apache.carbondata.spark.util.CommonUtil
class MergeIndexEventListener extends OperationEventListener with Logging {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
event match {
case preStatusUpdateEvent: LoadTablePostExecutionEvent =>
- LOGGER.audit("Load post status event-listener called for merge index")
+ Audit.log(LOGGER, "Load post status event-listener called for merge index")
val loadModel = preStatusUpdateEvent.getCarbonLoadModel
val carbonTable = loadModel.getCarbonDataLoadSchema.getCarbonTable
val compactedSegments = loadModel.getMergedSegmentIds
@@ -71,7 +71,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
}
}
case alterTableCompactionPostEvent: AlterTableCompactionPostEvent =>
- LOGGER.audit("Merge index for compaction called")
+ Audit.log(LOGGER, "Merge index for compaction called")
val carbonTable = alterTableCompactionPostEvent.carbonTable
val mergedLoads = alterTableCompactionPostEvent.compactedLoads
val sparkSession = alterTableCompactionPostEvent.sparkSession
@@ -79,11 +79,10 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
mergeIndexFilesForCompactedSegments(sparkSession, carbonTable, mergedLoads)
}
case alterTableMergeIndexEvent: AlterTableMergeIndexEvent =>
- val alterTableModel = alterTableMergeIndexEvent.alterTableModel
val carbonMainTable = alterTableMergeIndexEvent.carbonTable
val sparkSession = alterTableMergeIndexEvent.sparkSession
if (!carbonMainTable.isStreamingSink) {
- LOGGER.audit(s"Compaction request received for table " +
+ Audit.log(LOGGER, s"Compaction request received for table " +
s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
LOGGER.info(s"Merge Index request received for table " +
s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }")
@@ -129,7 +128,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
clearBlockDataMapCache(carbonMainTable, validSegmentIds)
val requestMessage = "Compaction request completed for table " +
s"${ carbonMainTable.getDatabaseName }.${ carbonMainTable.getTableName }"
- LOGGER.audit(requestMessage)
+ Audit.log(LOGGER, requestMessage)
LOGGER.info(requestMessage)
} else {
val lockMessage = "Not able to acquire the compaction lock for table " +
@@ -138,7 +137,7 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
.getTableName
}"
- LOGGER.audit(lockMessage)
+ Audit.log(LOGGER, lockMessage)
LOGGER.error(lockMessage)
CarbonException.analysisException(
"Table is already locked for compaction. Please try after some time.")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 66f9e47..081482c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,8 +22,10 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.metadata.ColumnarFormatVersion
@@ -151,7 +153,7 @@ case class CarbonCreateDataMapCommand(
systemFolderLocation, tableIdentifier, dmProviderName)
OperationListenerBus.getInstance().fireEvent(createDataMapPostExecutionEvent,
operationContext)
- LOGGER.audit(s"DataMap $dataMapName successfully added")
+ Audit.log(LOGGER, s"DataMap $dataMapName successfully added")
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 4607de0..67e2dee 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -28,7 +28,8 @@ import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -52,7 +53,7 @@ case class CarbonDropDataMapCommand(
forceDrop: Boolean = false)
extends AtomicRunnableCommand {
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
private var dataMapProvider: DataMapProvider = _
var mainTable: CarbonTable = _
var dataMapSchema: DataMapSchema = _
@@ -111,7 +112,7 @@ case class CarbonDropDataMapCommand(
locksToBeAcquired foreach {
lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
}
- LOGGER.audit(s"Deleting datamap [$dataMapName] under table [$tableName]")
+ Audit.log(LOGGER, s"Deleting datamap [$dataMapName] under table [$tableName]")
// drop index,mv datamap on the main table.
if (mainTable != null &&
@@ -172,7 +173,7 @@ case class CarbonDropDataMapCommand(
case e: NoSuchDataMapException =>
throw e
case ex: Exception =>
- LOGGER.error(ex, s"Dropping datamap $dataMapName failed")
+ LOGGER.error(s"Dropping datamap $dataMapName failed", ex)
throwMetadataException(dbName, tableName,
s"Dropping datamap $dataMapName failed: ${ ex.getMessage }")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index b699ec1..8e338db 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -25,14 +25,15 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CompactionModel}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -63,7 +64,7 @@ case class CarbonAlterTableCompactionCommand(
var table: CarbonTable = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val tableName = alterTableModel.tableName.toLowerCase
val dbName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
table = if (tableInfoOp.isDefined) {
@@ -204,7 +205,7 @@ case class CarbonAlterTableCompactionCommand(
storeLocation: String,
compactedSegments: java.util.List[String],
operationContext: OperationContext): Unit = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
@@ -216,7 +217,7 @@ case class CarbonAlterTableCompactionCommand(
}
}
- LOGGER.audit(s"Compaction request received for table " +
+ Audit.log(LOGGER, s"Compaction request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -313,7 +314,7 @@ case class CarbonAlterTableCompactionCommand(
throw e
}
} else {
- LOGGER.audit("Not able to acquire the compaction lock for table " +
+ Audit.log(LOGGER, "Not able to acquire the compaction lock for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
LOGGER.error(s"Not able to acquire the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -328,7 +329,7 @@ case class CarbonAlterTableCompactionCommand(
operationContext: OperationContext,
sparkSession: SparkSession
): Unit = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
// 1. delete the lock of streaming.lock, forcing the stream to be closed
val streamingLock = CarbonLockFactory.getCarbonLockObj(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
index a477167..ba20773 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -33,7 +33,7 @@ case class CarbonAlterTableFinishStreaming(
extends MetadataCommand {
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val carbonTable = CarbonEnv.getCarbonTable(dbName, tableName)(sparkSession)
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val streamingLock = CarbonLockFactory.getCarbonLockObj(
carbonTable.getTableInfo().getOrCreateAbsoluteTableIdentifier(),
LockUsage.STREAMING_LOCK)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index e561a5a..a390191 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -147,7 +147,7 @@ case class CarbonCleanFilesCommand(
case e: Throwable =>
// catch all exceptions to avoid failure
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- .error(e, "Failed to clean in progress segments")
+ .error("Failed to clean in progress segments", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 7cf8c1e..ee0f5ab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -36,7 +36,7 @@ case class CarbonInsertIntoCommand(
var loadCommand: CarbonLoadDataCommand = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
def containsLimit(plan: LogicalPlan): Boolean = {
plan find {
case limit: GlobalLimit => true
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 43c8b86..22d0bb3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
+import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql._
@@ -48,7 +49,8 @@ import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -109,7 +111,7 @@ case class CarbonLoadDataCommand(
var parentTablePath: String = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
table = if (tableInfoOp.isDefined) {
CarbonTable.buildFromTableInfo(tableInfoOp.get)
@@ -121,7 +123,7 @@ case class CarbonLoadDataCommand(
}
if (null == relation.carbonTable) {
LOGGER.error(s"Data loading failed. table not found: $dbName.$tableName")
- LOGGER.audit(s"Data loading failed. table not found: $dbName.$tableName")
+ Audit.log(LOGGER, s"Data loading failed. table not found: $dbName.$tableName")
throw new NoSuchTableException(dbName, tableName)
}
relation.carbonTable
@@ -150,7 +152,7 @@ case class CarbonLoadDataCommand(
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
var concurrentLoadLock: Option[ICarbonLock] = None
carbonProperty.addProperty("zookeeper.enable.lock", "false")
@@ -341,7 +343,7 @@ case class CarbonLoadDataCommand(
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
- LOGGER.error(ex, s"Dataload failure for $dbName.$tableName")
+ LOGGER.error(s"Dataload failure for $dbName.$tableName", ex)
throw new RuntimeException(s"Dataload failure for $dbName.$tableName, ${ex.getMessage}")
// In case of event related exception
case preEventEx: PreEventException =>
@@ -352,7 +354,7 @@ case class CarbonLoadDataCommand(
if (isUpdateTableStatusRequired) {
CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uuid)
}
- LOGGER.audit(s"Dataload failure for $dbName.$tableName. Please check the logs")
+ Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. Please check the logs")
throw ex
} finally {
releaseConcurrentLoadLock(concurrentLoadLock, LOGGER)
@@ -369,7 +371,7 @@ case class CarbonLoadDataCommand(
} catch {
case ex: Exception =>
LOGGER.error(ex)
- LOGGER.audit(s"Dataload failure for $dbName.$tableName. " +
+ Audit.log(LOGGER, s"Dataload failure for $dbName.$tableName. " +
"Problem deleting the partition folder")
throw ex
}
@@ -377,10 +379,10 @@ case class CarbonLoadDataCommand(
}
} catch {
case dle: DataLoadingException =>
- LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
+ Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + dle.getMessage)
throw dle
case mce: MalformedCarbonCommandException =>
- LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
+ Audit.log(LOGGER, s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
throw mce
}
Seq.empty
@@ -412,7 +414,7 @@ case class CarbonLoadDataCommand(
}
private def releaseConcurrentLoadLock(concurrentLoadLock: Option[ICarbonLock],
- LOGGER: LogService): Unit = {
+ LOGGER: Logger): Unit = {
if (concurrentLoadLock.isDefined) {
if (concurrentLoadLock.get.unlock()) {
LOGGER.info("concurrent_load lock for table" + table.getTablePath +
@@ -432,7 +434,7 @@ case class CarbonLoadDataCommand(
partitionStatus: SegmentStatus,
hadoopConf: Configuration,
operationContext: OperationContext,
- LOGGER: LogService): Seq[Row] = {
+ LOGGER: Logger): Seq[Row] = {
var rows = Seq.empty[Row]
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
@@ -561,7 +563,7 @@ case class CarbonLoadDataCommand(
partitionStatus: SegmentStatus,
hadoopConf: Configuration,
operationContext: OperationContext,
- LOGGER: LogService): Seq[Row] = {
+ LOGGER: Logger): Seq[Row] = {
var rows = Seq.empty[Row]
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
@@ -615,9 +617,8 @@ case class CarbonLoadDataCommand(
hadoopConf: Configuration,
dataFrame: Option[DataFrame],
operationContext: OperationContext,
- LOGGER: LogService): Seq[Row] = {
+ LOGGER: Logger): Seq[Row] = {
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
var timeStampformatString = carbonLoadModel.getTimestampformat
if (timeStampformatString.isEmpty) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index cf88fb9..39e85ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -27,7 +27,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.PartitionSpec
@@ -47,8 +48,7 @@ case class RefreshCarbonTableCommand(
databaseNameOp: Option[String],
tableName: String)
extends MetadataCommand {
- val LOGGER: LogService =
- LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -88,7 +88,7 @@ case class RefreshCarbonTableCommand(
val msg = s"Table registration with Database name [$databaseName] and Table name " +
s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
s" not copied under database [$databaseName]"
- LOGGER.audit(msg)
+ Audit.log(LOGGER, msg)
throwMetadataException(databaseName, tableName, msg)
}
// 2.2.1 Register the aggregate tables to hive
@@ -101,14 +101,14 @@ case class RefreshCarbonTableCommand(
registerAllPartitionsToHive(identifier, sparkSession)
}
} else {
- LOGGER.audit(
+ Audit.log(LOGGER,
s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
s"failed." +
s"Table [$tableName] either non carbon table or stale carbon table under database " +
s"[$databaseName]")
}
} else {
- LOGGER.audit(
+ Audit.log(LOGGER,
s"Table registration with Database name [$databaseName] and Table name [$tableName] " +
s"failed." +
s"Table [$tableName] either already exists or registered under database [$databaseName]")
@@ -154,7 +154,7 @@ case class RefreshCarbonTableCommand(
OperationListenerBus.getInstance.fireEvent(refreshTablePreExecutionEvent, operationContext)
CarbonCreateTableCommand(tableInfo, ifNotExistsSet = false, tableLocation = Some(tablePath))
.run(sparkSession)
- LOGGER.audit(s"Table registration with Database name [$dbName] and Table name " +
+ Audit.log(LOGGER, s"Table registration with Database name [$dbName] and Table name " +
s"[$tableName] is successful.")
} catch {
case e: AnalysisException => throw e
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 0127d7e..053937b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -21,8 +21,10 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
+import org.apache.carbondata.api.CarbonStore.LOGGER
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.features.TableOperation
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -79,7 +81,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
var lockStatus = false
try {
lockStatus = metadataLock.lockWithRetries()
- LOGGER.audit(s" Delete data request has been received " +
+ Audit.log(LOGGER, s" Delete data request has been received " +
s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
if (lockStatus) {
LOGGER.info("Successfully able to get the table metadata file lock")
@@ -119,7 +121,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
case e: Exception =>
- LOGGER.error(e, "Exception in Delete data operation " + e.getMessage)
+ LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
// ****** start clean up.
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 4e9c1af..31e1779 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -163,7 +163,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
case e: Exception =>
- LOGGER.error(e, "Exception in update operation")
+ LOGGER.error("Exception in update operation", e)
// ****** start clean up.
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, currentTime + "")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 7e7f671..d118539 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -20,9 +20,7 @@ package org.apache.spark.sql.execution.command.mutation
import java.util
import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
@@ -34,7 +32,9 @@ import org.apache.spark.sql.execution.command.ExecutionErrors
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.api.CarbonStore.LOGGER
+import org.apache.carbondata.common.logging.impl.Audit
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -46,13 +46,12 @@ import org.apache.carbondata.core.util.{CarbonUtil, ThreadLocalSessionInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.exception.MultipleMatchingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.spark.DeleteDelataResultImpl
object DeleteExecution {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
/**
* generate the delete delta files in each segment as per the RDD.
@@ -167,7 +166,7 @@ object DeleteExecution {
} else {
// In case of failure , clean all related delete delta files
CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
- LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+ Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
val errorMsg =
"Delete data operation is failed due to failure in creating delete delta file for " +
"segment : " + resultOfBlock._2._1.getSegmentName + " block : " +
@@ -202,7 +201,7 @@ object DeleteExecution {
listOfSegmentToBeMarkedDeleted)
) {
LOGGER.info(s"Delete data operation is successful for ${ database }.${ tableName }")
- LOGGER.audit(s"Delete data operation is successful for ${ database }.${ tableName }")
+ Audit.log(LOGGER, s"Delete data operation is successful for ${ database }.${ tableName }")
}
else {
// In case of failure , clean all related delete delta files
@@ -210,7 +209,7 @@ object DeleteExecution {
val errorMessage = "Delete data operation is failed due to failure " +
"in table status updation."
- LOGGER.audit(s"Delete data operation is failed for ${ database }.${ tableName }")
+ Audit.log(LOGGER, s"Delete data operation is failed for ${ database }.${ tableName }")
LOGGER.error("Delete data operation is failed due to failure in table status updation.")
executorErrors.failureCauses = FailureCauses.STATUS_FILE_UPDATION_FAILURE
executorErrors.errorMsg = errorMessage
@@ -291,12 +290,12 @@ object DeleteExecution {
deleteStatus = SegmentStatus.SUCCESS
} catch {
case e : MultipleMatchingException =>
- LOGGER.audit(e.getMessage)
+ Audit.log(LOGGER, e.getMessage)
LOGGER.error(e.getMessage)
// dont throw exception here.
case e: Exception =>
val errorMsg = s"Delete data operation is failed for ${ database }.${ tableName }."
- LOGGER.audit(errorMsg)
+ Audit.log(LOGGER, errorMsg)
LOGGER.error(errorMsg + e.getMessage)
throw e
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 35fc3c3..3472d8a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -25,21 +25,20 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql._
import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompactionCommand
-import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.util.SparkSQLUtil
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.ThreadLocalSessionInfo
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
object HorizontalCompaction {
- val LOG: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOG = LogServiceFactory.getLogService(this.getClass.getName)
/**
* The method does horizontal compaction. After Update and Delete completion
@@ -131,7 +130,7 @@ object HorizontalCompaction {
}
LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].")
- LOG.audit(s"Horizontal Update Compaction operation started for [$db.$table].")
+ Audit.log(LOG, s"Horizontal Update Compaction operation started for [$db.$table].")
try {
// Update Compaction.
@@ -155,7 +154,7 @@ object HorizontalCompaction {
s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
}
LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
- LOG.audit(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
+ Audit.log(LOG, s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
}
/**
@@ -181,7 +180,7 @@ object HorizontalCompaction {
}
LOG.info(s"Horizontal Delete Compaction operation started for [$db.$table].")
- LOG.audit(s"Horizontal Delete Compaction operation started for [$db.$table].")
+ Audit.log(LOG, s"Horizontal Delete Compaction operation started for [$db.$table].")
try {
@@ -226,7 +225,7 @@ object HorizontalCompaction {
timestamp.toString,
segmentUpdateStatusManager)
if (updateStatus == false) {
- LOG.audit(s"Delete Compaction data operation is failed for [$db.$table].")
+ Audit.log(LOG, s"Delete Compaction data operation is failed for [$db.$table].")
LOG.error("Delete Compaction data operation is failed.")
throw new HorizontalCompactionException(
s"Horizontal Delete Compaction Failed for [$db.$table] ." +
@@ -234,7 +233,7 @@ object HorizontalCompaction {
}
else {
LOG.info(s"Horizontal Delete Compaction operation completed for [$db.$table].")
- LOG.audit(s"Horizontal Delete Compaction operation completed for [$db.$table].")
+ Audit.log(LOG, s"Horizontal Delete Compaction operation completed for [$db.$table].")
}
}
catch {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index b76a485..c230322 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -28,13 +28,14 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.AlterTableUtil
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
import org.apache.carbondata.core.datastore.compression.CompressorFactory
import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -48,7 +49,7 @@ case class CarbonAlterTableDropPartitionCommand(
model: AlterTableDropPartitionModel)
extends AtomicRunnableCommand {
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
@@ -121,7 +122,7 @@ case class CarbonAlterTableDropPartitionCommand(
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
val dbName = model.databaseName.getOrElse(sparkSession.catalog.currentDatabase)
val tableName = model.tableName
var locks = List.empty[ICarbonLock]
@@ -168,7 +169,7 @@ case class CarbonAlterTableDropPartitionCommand(
LOGGER.info("Locks released after alter table drop partition action.")
}
LOGGER.info(s"Alter table drop partition is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table drop partition is successful for table $dbName.$tableName")
+ Audit.log(LOGGER, s"Alter table drop partition is successful for table $dbName.$tableName")
Seq.empty
}
@@ -177,7 +178,7 @@ case class CarbonAlterTableDropPartitionCommand(
carbonLoadModel: CarbonLoadModel,
dropWithData: Boolean,
oldPartitionIds: List[Int]): Unit = {
- LOGGER.audit(s"Drop partition request received for table " +
+ Audit.log(LOGGER, s"Drop partition request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startDropThreads(
@@ -246,7 +247,7 @@ case class dropPartitionThread(sqlContext: SQLContext,
dropWithData: Boolean,
oldPartitionIds: List[Int]) extends Thread {
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def run(): Unit = {
try {
@@ -254,8 +255,8 @@ case class dropPartitionThread(sqlContext: SQLContext,
segmentId, partitionId, dropWithData, oldPartitionIds)
} catch {
case e: Exception =>
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
- LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }", e)
}
}
@@ -274,7 +275,7 @@ case class dropPartitionThread(sqlContext: SQLContext,
future.get
} catch {
case e: Exception =>
- LOGGER.error(e, s"Exception in partition drop thread ${ e.getMessage }")
+ LOGGER.error(s"Exception in partition drop thread ${ e.getMessage }", e)
throw e
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 753abaf..8b337c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{AlterTableUtil, PartitionUtils}
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.impl.Audit
import org.apache.carbondata.core.cache.CacheProvider
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -54,7 +55,7 @@ case class CarbonAlterTableSplitPartitionCommand(
splitPartitionModel: AlterTableSplitPartitionModel)
extends AtomicRunnableCommand {
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
private val oldPartitionIds: util.ArrayList[Int] = new util.ArrayList[Int]()
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
@@ -182,11 +183,12 @@ case class CarbonAlterTableSplitPartitionCommand(
} finally {
AlterTableUtil.releaseLocks(locks)
CacheProvider.getInstance().dropAllCache()
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
LOGGER.info("Locks released after alter table add/split partition action.")
if (success) {
LOGGER.info(s"Alter table add/split partition is successful for table $dbName.$tableName")
- LOGGER.audit(s"Alter table add/split partition is successful for table $dbName.$tableName")
+ Audit.log(LOGGER,
+ s"Alter table add/split partition is successful for table $dbName.$tableName")
}
}
Seq.empty
@@ -198,7 +200,7 @@ case class CarbonAlterTableSplitPartitionCommand(
carbonLoadModel: CarbonLoadModel,
oldPartitionIdList: List[Int]
): Unit = {
- LOGGER.audit(s"Add partition request received for table " +
+ Audit.log(LOGGER, s"Add partition request received for table " +
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
startSplitThreads(sqlContext,
@@ -264,7 +266,7 @@ case class SplitThread(sqlContext: SQLContext,
partitionId: String,
oldPartitionIdList: List[Int]) extends Thread {
- private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
override def run(): Unit = {
var triggeredSplitPartitionStatus = false
@@ -275,7 +277,7 @@ case class SplitThread(sqlContext: SQLContext,
triggeredSplitPartitionStatus = true
} catch {
case e: Exception =>
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
exception = e
}
@@ -301,7 +303,7 @@ case class SplitThread(sqlContext: SQLContext,
}
} catch {
case e: Exception =>
- LOGGER.error(e, s"Exception in partition split thread ${ e.getMessage }")
+ LOGGER.error(s"Exception in partition split thread ${ e.getMessage }", e)
throw e
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index b33652f..f606c04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -64,7 +64,7 @@ object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
trait CommitHelper {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
operationContext: OperationContext,
@@ -586,7 +586,7 @@ object CommitPreAggregateListener extends OperationEventListener with CommitHelp
} catch {
case e: Exception =>
operationContext.setProperty("commitComplete", false)
- LOGGER.error(e, "Problem while committing data maps")
+ LOGGER.error("Problem while committing data maps", e)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/06adb5a0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index f26d1cb..d16f570 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -419,11 +419,10 @@ object PreAggregateUtil {
*/
def updateMainTable(carbonTable: CarbonTable,
childSchema: DataMapSchema, sparkSession: SparkSession): TableInfo = {
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
LockUsage.DROP_TABLE_LOCK)
var locks = List.empty[ICarbonLock]
- var numberOfCurrentChild: Int = 0
val dbName = carbonTable.getDatabaseName
val tableName = carbonTable.getTableName
try {
@@ -450,7 +449,7 @@ object PreAggregateUtil {
thriftTableInfo
} catch {
case e: Exception =>
- LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
+ LOGGER.error("Pre Aggregate Parent table update failed reverting changes", e)
throw e
} finally {
// release lock after command execution completion