You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by mohammadshahidkhan <> on 2018/07/09 10:42:22 UTC

[GitHub] carbondata pull request #2466: [WIP][CARBONDATA-2710][Spark Integration] Ref...

GitHub user mohammadshahidkhan opened a pull request:

    [WIP][CARBONDATA-2710][Spark Integration] Refactor CarbonSparkSqlParser for better code reuse.

    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
     - [ ] Any interfaces changed?
     - [ ] Any backward compatibility impacted?
     - [ ] Document update required?
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 

You can merge this pull request into a Git repository by running:

    $ git pull refactor_spark_integration

Alternatively you can review and apply these changes as the patch at:

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2466
commit 47ffeff1293f60c572985500bf875fa88b34931f
Author: mohammadshahidkhan <mo...@...>
Date:   2018-07-09T10:38:47Z

    [CARBONDATA-2710][Spark Integration] Refactor CarbonSparkSqlParser for better code reuse.



[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by KanakaKumar <>.
Github user KanakaKumar commented on the issue:


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Failed  with Spark 2.1.0, Please check CI


[GitHub] carbondata issue #2466: [WIP][CARBONDATA-2710][Spark Integration] Refactor C...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by mohammadshahidkhan <>.
Github user mohammadshahidkhan commented on the issue:
    retest this please


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [WIP][CARBONDATA-2710][Spark Integration] Refactor C...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.1.0, Please check CI


[GitHub] carbondata pull request #2466: [CARBONDATA-2710][Spark Integration] Refactor...

Posted by KanakaKumar <>.
Github user KanakaKumar commented on a diff in the pull request:
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala ---
    @@ -169,220 +128,45 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
           provider) = createTableTuple
         val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
    -    // TODO: implement temporary tables
    -    if (temp) {
    -      throw new ParseException(
    -        "CREATE TEMPORARY TABLE is not supported yet. " +
    -        "Please use CREATE TEMPORARY VIEW as an alternative.", tableHeader)
    -    }
    -    if (skewSpecContext != null) {
    -      operationNotAllowed("CREATE TABLE ... SKEWED BY", skewSpecContext)
    -    }
    -    if (bucketSpecContext != null) {
    -      operationNotAllowed("CREATE TABLE ... CLUSTERED BY", bucketSpecContext)
    -    }
    -    val cols = Option(columns).toSeq.flatMap(visitColTypeList)
    -    val properties = getPropertyKeyValues(tablePropertyList)
    -    // Ensuring whether no duplicate name is used in table definition
    -    val colNames =
    -    if (colNames.length != colNames.distinct.length) {
    -      val duplicateColumns = colNames.groupBy(identity).collect {
    -        case (x, ys) if ys.length > 1 => "\"" + x + "\""
    -      }
    -      operationNotAllowed(s"Duplicated column names found in table definition of " +
    -                          s"$tableIdentifier: ${duplicateColumns.mkString("[", ",", "]")}", columns)
    -    }
    -    val tablePath = if (locationSpecContext != null) {
    +    val cols: Seq[StructField] = Option(columns).toSeq.flatMap(visitColTypeList)
    +    val colNames: Seq[String] = CarbonSparkSqlParserUtil
    +      .validateCreateTableReqAndGetColumns(tableHeader,
    +        skewSpecContext,
    +        bucketSpecContext,
    +        columns,
    +        cols,
    +        tableIdentifier,
    +        temp)
    +    val tablePath: Option[String] = if (locationSpecContext != null) {
         } else {
         val tableProperties = mutable.Map[String, String]()
    +    val properties: Map[String, String] = getPropertyKeyValues(tablePropertyList)
         properties.foreach{property => tableProperties.put(property._1, property._2)}
         // validate partition clause
         val (partitionByStructFields, partitionFields) =
           validatePartitionFields(partitionColumns, colNames, tableProperties)
    -    // validate partition clause
    -    if (partitionFields.nonEmpty) {
    -      if (!CommonUtil.validatePartitionColumns(tableProperties, partitionFields)) {
    -         throw new MalformedCarbonCommandException("Error: Invalid partition definition")
    -      }
    -      // partition columns should not be part of the schema
    -      val badPartCols = partitionFields
    -        .map(_.partitionColumn.toLowerCase)
    -        .toSet
    -        .intersect(
    -      if (badPartCols.nonEmpty) {
    -        operationNotAllowed(s"Partition columns should not be specified in the schema: " +
    -                  "\"" + _ + "\"").mkString("[", ",", "]"),
    -          partitionColumns)
    -      }
    -    }
    -    val options = new CarbonOption(properties)
    -    // validate streaming property
    -    validateStreamingProperty(options)
    -    var fields = parser.getFields(cols ++ partitionByStructFields)
         // validate for create table as select
         val selectQuery = Option(query).map(plan)
    -    selectQuery match {
    -      case Some(q) =>
    -        // create table as select does not allow creation of partitioned table
    -        if (partitionFields.nonEmpty) {
    -          val errorMessage = "A Create Table As Select (CTAS) statement is not allowed to " +
    -                             "create a partitioned table using Carbondata file formats."
    -          operationNotAllowed(errorMessage, partitionColumns)
    -        }
    -        // create table as select does not allow to explicitly specify schema
    -        if (fields.nonEmpty) {
    -          operationNotAllowed(
    -            "Schema may not be specified in a Create Table As Select (CTAS) statement", columns)
    -        }
    -        // external table is not allow
    -        if (external) {
    -          operationNotAllowed("Create external table as select", tableHeader)
    -        }
    -        fields = parser
    -          .getFields(CarbonEnv.getInstance(sparkSession).carbonMetastore
    -            .getSchemaFromUnresolvedRelation(sparkSession, Some(q).get))
    -      case _ =>
    -        // ignore this case
    -    }
    -    if (partitionFields.nonEmpty && options.isStreaming) {
    -      operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
    -    }
    -    // validate tblProperties
    -    val bucketFields = parser.getBucketFields(tableProperties, fields, options)
    -    var isTransactionalTable : Boolean = true
    -    val tableInfo = if (external) {
    -      // read table info from schema file in the provided table path
    -      // external table also must convert table name to lower case
    -      val identifier = AbsoluteTableIdentifier.from(
    -        tablePath.get,
    -        CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession).toLowerCase(),
    -        tableIdentifier.table.toLowerCase())
    -      val table = try {
    -        val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
    -        if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
    -          if (provider.equalsIgnoreCase("'carbonfile'")) {
    -            SchemaReader.inferSchema(identifier, true)
    -          } else {
    -            isTransactionalTable = false
    -            SchemaReader.inferSchema(identifier, false)
    -          }
    -        }
    -        else {
    -          SchemaReader.getTableInfo(identifier)
    -        }
    -      }
    -        catch {
    -        case e: Throwable =>
    -          operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
    -      }
    -      // set "_external" property, so that DROP TABLE will not delete the data
    -      if (provider.equalsIgnoreCase("'carbonfile'")) {
    -        table.getFactTable.getTableProperties.put("_filelevelformat", "true")
    -        table.getFactTable.getTableProperties.put("_external", "false")
    -      } else {
    -        table.getFactTable.getTableProperties.put("_external", "true")
    -        table.getFactTable.getTableProperties.put("_filelevelformat", "false")
    -      }
    -      // setting local dictionary for all string coloumn for external table
    -      var isLocalDic_enabled = table.getFactTable.getTableProperties
    -        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
    -      if (null == isLocalDic_enabled) {
    -        table.getFactTable.getTableProperties
    -          .put(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
    -            CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT)
    -      }
    -      isLocalDic_enabled = table.getFactTable.getTableProperties
    -        .get(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE)
    -      if (CarbonScalaUtil.validateLocalDictionaryEnable(isLocalDic_enabled) &&
    -          isLocalDic_enabled.toBoolean) {
    -        val allcolumns = table.getFactTable.getListOfColumns
    -        for (i <- 0 until allcolumns.size()) {
    -          val cols = allcolumns.get(i)
    -          if (cols.getDataType == DataTypes.STRING || cols.getDataType == DataTypes.VARCHAR) {
    -            cols.setLocalDictColumn(true)
    -          }
    -          allcolumns.set(i, cols)
    -        }
    -        table.getFactTable.setListOfColumns(allcolumns)
    -      }
    -      table
    -    } else {
    -      // prepare table model of the collected tokens
    -      val tableModel: TableModel = parser.prepareTableModel(
    -        ifNotExists,
    -        convertDbNameToLowerCase(tableIdentifier.database),
    -        tableIdentifier.table.toLowerCase,
    -        fields,
    -        partitionFields,
    -        tableProperties,
    -        bucketFields,
    -        isAlterFlow = false,
    -        false,
    -        tableComment)
    -      TableNewProcessor(tableModel)
    -    }
    -    tableInfo.setTransactionalTable(isTransactionalTable)
    -    selectQuery match {
    -      case query@Some(q) =>
    -        CarbonCreateTableAsSelectCommand(
    -          tableInfo = tableInfo,
    -          query = query.get,
    -          ifNotExistsSet = ifNotExists,
    -          tableLocation = tablePath)
    -      case _ =>
    -        CarbonCreateTableCommand(
    -          tableInfo = tableInfo,
    -          ifNotExistsSet = ifNotExists,
    -          tableLocation = tablePath,
    -          external)
    -    }
    -  }
    -  private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {
    -    try {
    -      carbonOption.isStreaming
    -    } catch {
    -      case _: IllegalArgumentException =>
    -        throw new MalformedCarbonCommandException(
    -          "Table property 'streaming' should be either 'true' or 'false'")
    -    }
    +    val extraTableTuple = (cols, external, tableIdentifier, ifNotExists, colNames, tablePath,
    +      tableProperties, properties, partitionByStructFields, partitionFields,
    +      parser, sparkSession, selectQuery)
    +    CarbonSparkSqlParserUtil
    +      .createCarbonTable(createTableTuple, extraTableTuple)
       private def validatePartitionFields(
    --- End diff --
    Now this method has only 1 line of code. So, we can move this code line to caller and avoid this method completely. 


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by ravipesala <>.
Github user ravipesala commented on the issue:
    SDV Build Success , Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by ravipesala <>.
Github user ravipesala commented on the issue:
    SDV Build Success , Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by ravipesala <>.
Github user ravipesala commented on the issue:
    SDV Build Success , Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.1.0, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by mohammadshahidkhan <>.
Github user mohammadshahidkhan commented on the issue:
    retest SDV please


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Failed with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by ravipesala <>.
Github user ravipesala commented on the issue:
    SDV Build Success , Please check CI


[GitHub] carbondata pull request #2466: [CARBONDATA-2710][Spark Integration] Refactor...

Posted by asfgit <>.
Github user asfgit closed the pull request at:


[GitHub] carbondata issue #2466: [WIP][CARBONDATA-2710][Spark Integration] Refactor C...

Posted by ravipesala <>.
Github user ravipesala commented on the issue:
    SDV Build Fail , Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by ravipesala <>.
Github user ravipesala commented on the issue:
    SDV Build Fail , Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.1.0, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by mohammadshahidkhan <>.
Github user mohammadshahidkhan commented on the issue:
    Failled test case for the below build is unrelated. The same testcase is passing locally.


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Success with Spark 2.2.1, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Failed  with Spark 2.1.0, Please check CI


[GitHub] carbondata issue #2466: [CARBONDATA-2710][Spark Integration] Refactor Carbon...

Posted by CarbonDataQA <>.
Github user CarbonDataQA commented on the issue:
    Build Failed  with Spark 2.1.0, Please check CI


[GitHub] carbondata issue #2466: [WIP][CARBONDATA-2710][Spark Integration] Refactor C...

Posted by mohammadshahidkhan <>.
Github user mohammadshahidkhan commented on the issue:
    retest SDV please
