You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:21 UTC

[23/47] incubator-carbondata git commit: [CARBONDATA-106] Added audit logs for create and load command failures (#868)

[CARBONDATA-106] Added audit logs for create and load command failures (#868)



Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/f6e9fbc9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/f6e9fbc9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/f6e9fbc9

Branch: refs/heads/master
Commit: f6e9fbc9b892d327abafbf71d57c088bb66c8041
Parents: ad1c985
Author: Manu <ma...@gmail.com>
Authored: Mon Jul 25 21:28:05 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Mon Jul 25 21:28:05 2016 +0530

----------------------------------------------------------------------
 .../org/apache/spark/sql/CarbonSqlParser.scala  | 201 ++++++++++---------
 .../execution/command/carbonTableSchema.scala   |   8 +
 .../scala/org/apache/spark/util/FileUtils.scala |   6 +-
 .../TestLoadDataWithNotProperInputFile.scala    |   6 +-
 4 files changed, 124 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index e222bec..fdfc683 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -36,9 +36,11 @@ import org.apache.spark.sql.execution.command.{DimensionRelation, _}
 import org.apache.spark.sql.execution.datasources.DescribeCommand
 import org.apache.spark.sql.hive.HiveQlWrapper
 
+import org.carbondata.common.logging.LogServiceFactory
 import org.carbondata.core.carbon.metadata.datatype.DataType
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.util.DataTypeUtil
+import org.carbondata.processing.etl.DataLoadingException
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 import org.carbondata.spark.util.CommonUtil
 
@@ -48,6 +50,7 @@ import org.carbondata.spark.util.CommonUtil
 class CarbonSqlParser()
   extends AbstractSparkSQLParser with Logging {
 
+  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   protected val AGGREGATE = carbonKeyWord("AGGREGATE")
   protected val AS = carbonKeyWord("AS")
   protected val AGGREGATION = carbonKeyWord("AGGREGATION")
@@ -333,108 +336,124 @@ class CarbonSqlParser()
       // if create table taken is found then only we will handle.
       case Token("TOK_CREATETABLE", children) =>
 
-        var fields: Seq[Field] = Seq[Field]()
-        var tableComment: String = ""
-        var tableProperties = Map[String, String]()
-        var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
-        var likeTableName: String = ""
-        var storedBy: String = ""
-        var ifNotExistPresent: Boolean = false
-        var dbName: Option[String] = None
-        var tableName: String = ""
 
-        children.collect {
-          // collecting all the field  list
-          case list@Token("TOK_TABCOLLIST", _) =>
-            val cols = BaseSemanticAnalyzer.getColumns(list, true)
-            if (cols != null) {
-              val dupColsGrp = cols.asScala
-                                 .groupBy(x => x.getName) filter { case (_, colList) => colList
-                                                                                          .size > 1
-                               }
-              if (dupColsGrp.size > 0) {
-                var columnName: String = ""
-                dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
-                columnName = columnName.substring(0, columnName.lastIndexOf(", "))
-                val errorMessage = "Duplicate column name: " + columnName + " found in table " +
-                                   ".Please check create table statement."
-                throw new MalformedCarbonCommandException(errorMessage)
-              }
-              cols.asScala.map { col =>
-                val columnName = col.getName()
-                val dataType = Option(col.getType)
-                val name = Option(col.getName())
-                // This is to parse complex data types
-                val x = col.getName + ' ' + col.getType
-                val f: Field = anyFieldDef(new lexical.Scanner(x))
-                match {
-                  case Success(field, _) => field
-                  case failureOrError => new Field(columnName, dataType, name, None, null,
-                    Some("columnar"))
+          var fields: Seq[Field] = Seq[Field]()
+          var tableComment: String = ""
+          var tableProperties = Map[String, String]()
+          var partitionCols: Seq[PartitionerField] = Seq[PartitionerField]()
+          var likeTableName: String = ""
+          var storedBy: String = ""
+          var ifNotExistPresent: Boolean = false
+          var dbName: Option[String] = None
+          var tableName: String = ""
+
+          try {
+
+          children.collect {
+            // collecting all the field  list
+            case list@Token("TOK_TABCOLLIST", _) =>
+              val cols = BaseSemanticAnalyzer.getColumns(list, true)
+              if (cols != null) {
+                val dupColsGrp = cols.asScala
+                                   .groupBy(x => x.getName) filter { case (_, colList) => colList
+                                                                                            .size >
+                                                                                          1
+                                 }
+                if (dupColsGrp.size > 0) {
+                  var columnName: String = ""
+                  dupColsGrp.toSeq.foreach(columnName += _._1 + ", ")
+                  columnName = columnName.substring(0, columnName.lastIndexOf(", "))
+                  val errorMessage = "Duplicate column name: " + columnName + " found in table " +
+                                     ".Please check create table statement."
+                  throw new MalformedCarbonCommandException(errorMessage)
                 }
-                // the data type of the decimal type will be like decimal(10,0)
-                // so checking the start of the string and taking the precision and scale.
-                // resetting the data type with decimal
-                if (f.dataType.getOrElse("").startsWith("decimal")) {
-                  val (precision, scale) = getScaleAndPrecision(col.getType)
-                  f.precision = precision
-                  f.scale = scale
-                  f.dataType = Some("decimal")
+                cols.asScala.map { col =>
+                  val columnName = col.getName()
+                  val dataType = Option(col.getType)
+                  val name = Option(col.getName())
+                  // This is to parse complex data types
+                  val x = col.getName + ' ' + col.getType
+                  val f: Field = anyFieldDef(new lexical.Scanner(x))
+                  match {
+                    case Success(field, _) => field
+                    case failureOrError => new Field(columnName, dataType, name, None, null,
+                      Some("columnar"))
+                  }
+                  // the data type of the decimal type will be like decimal(10,0)
+                  // so checking the start of the string and taking the precision and scale.
+                  // resetting the data type with decimal
+                  if (f.dataType.getOrElse("").startsWith("decimal")) {
+                    val (precision, scale) = getScaleAndPrecision(col.getType)
+                    f.precision = precision
+                    f.scale = scale
+                    f.dataType = Some("decimal")
+                  }
+                  fields ++= Seq(f)
                 }
-                fields ++= Seq(f)
               }
-            }
-
-          case Token("TOK_IFNOTEXISTS", _) =>
-            ifNotExistPresent = true
 
-          case t@Token("TOK_TABNAME", _) =>
-            val (db, tblName) = extractDbNameTableName(t)
-            dbName = db
-            tableName = tblName.toLowerCase()
-
-          case Token("TOK_TABLECOMMENT", child :: Nil) =>
-            tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
-
-          case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
-            val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
-            if (cols != null) {
-              cols.asScala.map { col =>
-                val columnName = col.getName()
-                val dataType = Option(col.getType)
-                val comment = col.getComment
-                val partitionCol = new PartitionerField(columnName, dataType, comment)
-                partitionCols ++= Seq(partitionCol)
+            case Token("TOK_IFNOTEXISTS", _) =>
+              ifNotExistPresent = true
+
+            case t@Token("TOK_TABNAME", _) =>
+              val (db, tblName) = extractDbNameTableName(t)
+              dbName = db
+              tableName = tblName.toLowerCase()
+
+            case Token("TOK_TABLECOMMENT", child :: Nil) =>
+              tableComment = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+
+            case Token("TOK_TABLEPARTCOLS", list@Token("TOK_TABCOLLIST", _) :: Nil) =>
+              val cols = BaseSemanticAnalyzer.getColumns(list(0), false)
+              if (cols != null) {
+                cols.asScala.map { col =>
+                  val columnName = col.getName()
+                  val dataType = Option(col.getType)
+                  val comment = col.getComment
+                  val partitionCol = new PartitionerField(columnName, dataType, comment)
+                  partitionCols ++= Seq(partitionCol)
+                }
               }
-            }
-          case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
-            tableProperties ++= getProperties(list)
+            case Token("TOK_TABLEPROPERTIES", list :: Nil) =>
+              tableProperties ++= getProperties(list)
 
-          case Token("TOK_LIKETABLE", child :: Nil) =>
-            likeTableName = child.getChild(0).getText()
+            case Token("TOK_LIKETABLE", child :: Nil) =>
+              likeTableName = child.getChild(0).getText()
 
-          case Token("TOK_STORAGEHANDLER", child :: Nil) =>
-            storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
+            case Token("TOK_STORAGEHANDLER", child :: Nil) =>
+              storedBy = BaseSemanticAnalyzer.unescapeSQLString(child.getText)
 
-          case _ => // Unsupport features
-        }
-
-        if (!storedBy.equals(CarbonContext.datasourceName)) {
-          // TODO: should execute by Hive instead of error
-          sys.error("Not a carbon format request")
-        }
+            case _ => // Unsupport features
+          }
 
-      // validate tblProperties
-      if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
-        throw new MalformedCarbonCommandException("Invalid table properties")
-      }
-      // prepare table model of the collected tokens
-      val tableModel: tableModel = prepareTableModel(ifNotExistPresent, dbName, tableName, fields,
-        partitionCols,
-        tableProperties)
+          if (!storedBy.equals(CarbonContext.datasourceName)) {
+            // TODO: should execute by Hive instead of error
+            sys.error("Not a carbon format request")
+          }
 
-        // get logical plan.
-        CreateTable(tableModel)
+          // validate tblProperties
+          if (!CommonUtil.validateTblProperties(tableProperties, fields)) {
+            throw new MalformedCarbonCommandException("Invalid table properties")
+          }
+          // prepare table model of the collected tokens
+          val tableModel: tableModel = prepareTableModel(ifNotExistPresent,
+            dbName,
+            tableName,
+            fields,
+            partitionCols,
+            tableProperties)
+
+          // get logical plan.
+          CreateTable(tableModel)
+        }
+        catch {
+          case ce: MalformedCarbonCommandException =>
+            val message = if (tableName.isEmpty) "Create table command failed. "
+            else if (!dbName.isDefined) s"Create table command failed for $tableName. "
+            else s"Create table command failed for ${dbName.get}.$tableName. "
+            LOGGER.audit(message + ce.getMessage)
+            throw ce
+        }
 
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 5da01ac..897759f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -52,6 +52,7 @@ import org.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.carbondata.integration.spark.merger.CompactionType
 import org.carbondata.lcm.status.SegmentStatusManager
+import org.carbondata.processing.etl.DataLoadingException
 import org.carbondata.spark.exception.MalformedCarbonCommandException
 import org.carbondata.spark.load._
 import org.carbondata.spark.partition.api.impl.QueryPartitionHelper
@@ -1626,6 +1627,13 @@ private[sql] case class LoadCube(
         }
 
       }
+    } catch {
+      case dle: DataLoadingException =>
+        LOGGER.audit(s"Dataload failed for $schemaName.$tableName. " + dle.getMessage)
+        throw dle
+      case mce: MalformedCarbonCommandException =>
+        LOGGER.audit(s"Dataload failed for $schemaName.$tableName. " + mce.getMessage)
+        throw mce
     } finally {
       if (carbonLock != null) {
         if (carbonLock.unlock()) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index 8344956..0fd43a3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -57,7 +57,7 @@ object FileUtils extends Logging {
    */
   def getPaths(inputPath: String): String = {
     if (inputPath == null || inputPath.isEmpty) {
-      throw new DataLoadingException("input file path cannot be empty.")
+      throw new DataLoadingException("Input file path cannot be empty.")
     } else {
       val stringBuild = new StringBuilder()
       val filePaths = inputPath.split(",")
@@ -65,14 +65,14 @@ object FileUtils extends Logging {
         val fileType = FileFactory.getFileType(filePaths(i))
         val carbonFile = FileFactory.getCarbonFile(filePaths(i), fileType)
         if (!carbonFile.exists()) {
-          throw new DataLoadingException(s"the input file does not exist: ${filePaths(i)}" )
+          throw new DataLoadingException(s"The input file does not exist: ${filePaths(i)}" )
         }
         getPathsFromCarbonFile(carbonFile, stringBuild)
       }
       if (stringBuild.nonEmpty) {
         stringBuild.substring(0, stringBuild.size - 1)
       } else {
-        throw new DataLoadingException("please check your input path and make sure " +
+        throw new DataLoadingException("Please check your input path and make sure " +
           "that files end with '.csv' and content is not empty.")
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/f6e9fbc9/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
index b5830c4..9751f0b 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/dataload/TestLoadDataWithNotProperInputFile.scala
@@ -44,7 +44,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
       GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
     } catch {
       case e: Throwable =>
-        assert(e.getMessage.contains("please check your input path and make sure " +
+        assert(e.getMessage.contains("Please check your input path and make sure " +
           "that files end with '.csv' and content is not empty"))
     }
   }
@@ -58,7 +58,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
       GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
     } catch {
       case e: Throwable =>
-        assert(e.getMessage.contains("please check your input path and make sure " +
+        assert(e.getMessage.contains("Please check your input path and make sure " +
           "that files end with '.csv' and content is not empty"))
     }
   }
@@ -72,7 +72,7 @@ class TestLoadDataWithNotProperInputFile extends QueryTest {
       GlobalDictionaryUtil.loadDataFrame(CarbonHiveContext, carbonLoadModel)
     } catch {
       case e: Throwable =>
-        assert(e.getMessage.contains("the input file does not exist"))
+        assert(e.getMessage.contains("The input file does not exist"))
     }
   }
 }