You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:21 UTC
[23/47] incubator-carbondata git commit: [CARBONDATA-106] Added audit
logs for create and load command failures (#868)
[CARBONDATA-106] Added audit logs for create and load command failures (#868)
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f6e9fbc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f6e9fbc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f6e9fbc9
Branch: refs/heads/master
Commit: f6e9fbc9b892d327abafbf71d57c088bb66c8041
Parents: ad1c985
Author: Manu <ma...@gmail.com>
Authored: Mon Jul 25 21:28:05 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Mon Jul 25 21:28:05 2016 +0530
----------------------------------------------------------------------
.../org/apache/spark/sql/CarbonSqlParser.scala | 201 ++++++++++---------
.../execution/command/carbonTableSchema.scala | 8 +
.../scala/org/apache/spark/util/FileUtils.scala | 6 +-
.../TestLoadDataWithNotProperInputFile.scala | 6 +-
4 files changed, 124 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index e222bec..fdfc683 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -36,9 +36,11 @@ import org.apache.spark.sql.execution.command.{DimensionRelation, _}
import org.apache.spark.sql.execution.datasources.DescribeCommand
import org.apache.spark.sql.hive.HiveQlWrapper
+import org.carbondata.common.logging.LogServiceFactory
import org.carbondata.core.carbon.metadata.datatype.DataType
import org.carbondata.core.constants.CarbonCommonConstants
import org.carbondata.core.util.DataTypeUtil
+import org.carbondata.processing.etl.DataLoadingException
import org.carbondata.spark.exception.MalformedCarbonCommandException
import org.carbondata.spark.util.CommonUtil
@@ -48,6 +50,7 @@ import org.carbondata.spark.util.CommonUtil
class CarbonSqlParser()
extends AbstractSparkSQLParser with Logging {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
protected val AGGREGATE = carbonKeyWord("AGGREGATE")
protected val AS = carbonKeyWord("AS")
protected val AGGREGATION = carbonKeyWord("AGGREGATION")
@@ -333,108 +336,124 @@ class CarbonSqlParser()
// if create table taken is found then only we will handle.
case Token("TOK_CREATETABLE", children) =>
- var fields: Seq[Field] = Seq[Field]()
- var tableComment: String = ""
- var tableProperties = Map[String, String]()
- var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
- var likeTableName: String = ""
- var storedBy: String = ""
- var ifNotExistPresent: Boolean = false
- var dbName: Option[String] = None
- var tableName: String = ""
- children.collect {
- // collecting all the field list
- case list@Token("TOK_TABCOLLIST", _) =>
- val cols = BaseSemanticAnalyzer.getColumns(list, true)
- if (cols != null) {
- val dupColsGrp = cols.asScala
- .groupBy(x => x.getName) filter { case (_, colList) => colList
- .size > 1
- }
- if (dupColsGrp.size > 0) {
- var columnName: String = ""
- dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
- columnName = columnName.substring(0, columnName.lastIndexOf(", "))
- val errorMessage = "Duplicate column name: " + columnName + " found in table " +
- ".Please check create table statement."
- throw new MalformedCarbonCommandException(errorMessage)
- }
- cols.asScala.map { col =>
- val columnName = col.getName()
- val dataType = Option(col.getType)
- val name = Option(col.getName())
- // This is to parse complex data types
- val x = col.getName + ' ' + col.getType
- val f: Field = anyFieldDef(new lexical.Scanner(x))
- match {
- case Success(field, _) => field
- case failureOrError => new Field(columnName, dataType, name, None, null,
- Some("columnar"))
+ var fields: Seq[Field] = Seq[Field]()
+ var tableComment: String = ""
+ var tableProperties = Map[String, String]()
+ var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
+ var likeTableName: String = ""
+ var storedBy: String = ""
+ var ifNotExistPresent: Boolean = false
+ var dbName: Option[String] = None
+ var tableName: String = ""
+
+ try {
+
+ children.collect {
+ // collecting all the field list
+ case list@Token("TOK_TABCOLLIST", _) =>
+ val cols = BaseSemanticAnalyzer.getColumns(list, true)
+ if (cols != null) {
+ val dupColsGrp = cols.asScala
+ .groupBy(x => x.getName) filter { case (_, colList) => colList
+ .size >
+ 1
+ }
+ if (dupColsGrp.size > 0) {
+ var columnName: String = ""
+ dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
+ columnName = columnName.substring(0, columnName.lastIndexOf(", "))
+ val errorMessage = "Duplicate column name: " + columnName + " found in table " +
+ ".Please check create table statement."
+ throw new MalformedCarbonCommandException(errorMessage)
}
- // the data type of the decimal type will be like decimal(10,0)
- // so checking the start of the string and taking the precision and scale.
- // resetting the data type with decimal
- if (f.dataType.getOrElse("").startsWith("decimal")) {
- val (precision, scale) = getScaleAndPrecision(col.getType)
- f.precision = precision
- f.scale = scale
- f.dataType = Some("decimal")
+ cols.asScala.map { col =>
+ val columnName = col.getName()
+ val dataType = Option(col.getType)
+ val name = Option(col.getName())
+ // This is to parse complex data types
+ val x = col.getName + ' ' + col.getType
+ val f: Field = anyFieldDef(new lexical.Scanner(x))
+ match {
+ case Success(field, _) => field
+ case failureOrError => new Field(columnName, dataType, name, None, null,
+ Some("columnar"))
+ }
+ // the data type of the decimal type will be like decimal(10,0)
+ // so checking the start of the string and taking the precision and scale.
+ // resetting the data type with decimal
+ if (f.dataType.getOrElse("").startsWith("decimal")) {
+ val (precision, scale) = getScaleAndPrecision(col.getType)
+ f.precision = precision
+ f.scale = scale
+ f.dataType = Some("decimal")
+ }
+ fields ++= Seq(f)
}
- fields ++= Seq(f)
}
- }
-
- case Token("TOK_IFNOTEXISTS", _) =>
- ifNotExistPresent = true
- case t@Token("TOK_TABNAME", _) =>
- val (db, tblName) = extractDbNameTableName(t)
- dbName = db
- tableName = tblName.toLowerCase()
-
- case Token("TOK_TABLECOMMENT", child :: Nil) =>
- tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
- case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
- val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
- if (cols != null) {
- cols.asScala.map { col =>
- val columnName = col.getName()
- val dataType = Option(col.getType)
- val comment = col.getComment
- val partitionCol = new PartitionerField(columnName, dataType, comment)
- partitionCols ++= Seq(partitionCol)
+ case Token("TOK_IFNOTEXISTS", _) =>
+ ifNotExistPresent = true
+
+ case t@Token("TOK_TABNAME", _) =>
+ val (db, tblName) = extractDbNameTableName(t)
+ dbName = db
+ tableName = tblName.toLowerCase()
+
+ case Token("TOK_TABLECOMMENT", child :: Nil) =>
+ tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+
+ case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
+ val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
+ if (cols != null) {
+ cols.asScala.map { col =>
+ val columnName = col.getName()
+ val dataType = Option(col.getType)
+ val comment = col.getComment
+ val partitionCol = new PartitionerField(columnName, dataType, comment)
+ partitionCols ++= Seq(partitionCol)
+ }
}
- }
- case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
- tableProperties ++= getProperties(list)
+ case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+ tableProperties ++= getProperties(list)
- case Token("TOK_LIKETABLE", child :: Nil) =>
- likeTableName = child.getChild(0).getText()
+ case Token("TOK_LIKETABLE", child :: Nil) =>
+ likeTableName = child.getChild(0).getText()
- case Token("TOK_STORAGEHANDLER", child :: Nil) =>
- storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+ case Token("TOK_STORAGEHANDLER", child :: Nil) =>
+ storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
- case _ => // Unsupport features
- }
-
- if (!storedBy.equals(CarbonContext.datasourceName)) {
- // TODO: should execute by Hive instead of error
- sys.error("Not a carbon format request")
- }
+ case _ => // Unsupport features
+ }
- // validate tblProperties
- if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
- throw new MalformedCarbonCommandException("Invalid table properties")
- }
- // prepare table model of the collected tokens
- val tableModel: tableModel = prepareTableModel(ifNotExistPresent, dbName, tableName, fields,
- partitionCols,
- tableProperties)
+ if (!storedBy.equals(CarbonContext.datasourceName)) {
+ // TODO: should execute by Hive instead of error
+ sys.error("Not a carbon format request")
+ }
- // get logical plan.
- CreateTable(tableModel)
+ // validate tblProperties
+ if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
+ throw new MalformedCarbonCommandException("Invalid table properties")
+ }
+ // prepare table model of the collected tokens
+ val tableModel: tableModel = prepareTableModel(ifNotExistPresent,
+ dbName,
+ tableName,
+ fields,
+ partitionCols,
+ tableProperties)
+
+ // get logical plan.
+ CreateTable(tableModel)
+ }
+ catch {
+ case ce: MalformedCarbonCommandException =>
+ val message = if (tableName.isEmpty) "Create table command failed. "
+ else if (!dbName.isDefined) s"Create table command failed for $tableName. "
+ else s"Create table command failed for ${dbName.get}.$tableName. "
+ LOGGER.audit(message + ce.getMessage)
+ throw ce
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 5da01ac..897759f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -52,6 +52,7 @@ import org.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.carbondata.integration.spark.merger.CompactionType
import org.carbondata.lcm.status.SegmentStatusManager
+import org.carbondata.processing.etl.DataLoadingException
import org.carbondata.spark.exception.MalformedCarbonCommandException
import org.carbondata.spark.load._
import org.carbondata.spark.partition.api.impl.QueryPartitionHelper
@@ -1626,6 +1627,13 @@ private[sql] case class LoadCube(
}
}
+ } catch {
+ case dle: DataLoadingException =>
+ LOGGER.audit(s"Dataload failed for $schemaName.$tableName. " + dle.getMessage)
+ throw dle
+ case mce: MalformedCarbonCommandException =>
+ LOGGER.audit(s"Dataload failed for $schemaName.$tableName. " + mce.getMessage)
+ throw mce
} finally {
if (carbonLock != null) {
if (carbonLock.unlock()) {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index 8344956..0fd43a3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -57,7 +57,7 @@ object FileUtils extends Logging {
*/
def getPaths(inputPath: String): String = {
if (inputPath == null || inputPath.isEmpty) {
- throw new DataLoadingException("input file path cannot be empty.")
+ throw new DataLoadingException("Input file path cannot be empty.")
} else {
val stringBuild = new StringBuilder()
val filePaths = inputPath.split(",")
@@ -65,14 +65,14 @@ object FileUtils extends Logging {
val fileType = FileFactory.getFileType(filePaths(i))
val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType)
if (!carbonFile.exists()) {
- throw new DataLoadingException(s"the input file does not exist: ${filePaths(i)}" )
+ throw new DataLoadingException(s"The input file does not exist: ${filePaths(i)}" )
}
getPathsFromCarbonFile(carbonFile, stringBuild)
}
if (stringBuild.nonEmpty) {
stringBuild.substring(0, stringBuild.size - 1)
} else {
- throw new DataLoadingException("please check your input path and make sure " +
+ throw new DataLoadingException("Please check your input path and make sure " +
"that files end with '.csv' and content is not empty.")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index b5830c4..9751f0b 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -44,7 +44,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
} catch {
case e: Throwable =>
- assert(e.getMessage.contains("please check your input path and make sure " +
+ assert(e.getMessage.contains("Please check your input path and make sure " +
"that files end with '.csv' and content is not empty"))
}
}
@@ -58,7 +58,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
} catch {
case e: Throwable =>
- assert(e.getMessage.contains("please check your input path and make sure " +
+ assert(e.getMessage.contains("Please check your input path and make sure " +
"that files end with '.csv' and content is not empty"))
}
}
@@ -72,7 +72,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
} catch {
case e: Throwable =>
- assert(e.getMessage.contains("the input file does not exist"))
+ assert(e.getMessage.contains("The input file does not exist"))
}
}
}