You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kunal642 <gi...@git.apache.org> on 2018/03/27 22:42:21 UTC

[GitHub] carbondata pull request #2109: [WIP] Partition preaggregate support

GitHub user kunal642 opened a pull request:

    https://github.com/apache/carbondata/pull/2109

    [WIP] Partition preaggregate support

    Be sure to do all of the following checklist to help us incorporate 
    your contribution quickly and easily:
    
     - [ ] Any interfaces changed?
     
     - [ ] Any backward compatibility impacted?
     
     - [ ] Document update required?
    
     - [ ] Testing done
            Please provide details on 
            - Whether new unit test cases have been added or why no new tests are required?
            - How it is tested? Please attach test report.
            - Is it a performance related change? Please attach the performance test report.
            - Any additional information to help reviewers in testing this change.
           
     - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA. 
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kunal642/carbondata partition_preaggregate_support

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/carbondata/pull/2109.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2109
    
----

----


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    retest sdv please


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4823/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178219012
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.spark.sql.execution.command.partition
     
     import java.util
    +import java.util.UUID
    --- End diff --
    
    Do you allow dropping of partitions directly on aggregate table? I mean not through parent table.


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4168/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178775449
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    --- End diff --
    
    1. Make this call in finally block 
    2. We need to have a mechanism for clean up during clean operation. **For this we can raise a jira to track the issue**


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3580/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178719972
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---
    @@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with
         segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
       }
     
    +  test("test auto compaction on aggregate table") {
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
    +    segmentNamesSum.sorted should equal  (Array("0", "0.1", "1", "2", "3"))
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
    --- End diff --
    
    1. Wrap the test case in try/finally...In case of any failure the property need to be set to default value again else proceeding test cases might fail
    2. Replace false with default value from CarbonCommonConstants


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4689/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4158/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4658/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4197/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178217935
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
    @@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
       override def executeCompaction(): Unit = {
         val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
         val loadMetaDataDetails = identifySegmentsToBeMerged()
    -    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    // If segmentFile name is specified in load details then segment is for partition table
    +    // therefore the segment file name should be loadName#segmentFileName.segment
    +    val segments = loadMetaDataDetails.asScala.map {
    +      loadDetail => if (loadDetail.getSegmentFile != null) {
    +        loadDetail.getLoadName + "#" + loadDetail.getSegmentFile
    +      } else {
    +        loadDetail.getLoadName
    +      }
    --- End diff --
    
    Just use new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile).toString instead of doing this


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4692/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178787300
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -62,8 +63,25 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val parentPartitionColumns = if (parentTable.isHivePartitionTable) {
    +      partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +    } else {
    +      Seq()
    +    }
    +    // Generate child table partition columns in the same order as the parent table.
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if parentPartitionColumns
    +        .exists(parentCol =>
    +          parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +        (PartitionerField(field.name.get,
    +          field.dataType,
    +          field.columnComment), parentPartitionColumns
    +          .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
    +    }.toSeq.sortBy(_._2).map(_._1)
    --- End diff --
    
    I think sortBy and map operation is not required here as already the child datamap ordering is according to the parent table partition column ordering


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4259/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4761/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4193/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801725
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    +        operationContext,
    +        uuid)
    +      if (commitFailed) {
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +
    +      }
    +    }
    +}
    +
    +object AlterTableDropPartitionMetaListener extends OperationEventListener{
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
    +    val parentCarbonTable = dropPartitionEvent.parentCarbonTable
    +    val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
    +    val sparkSession = SparkSession.getActiveSession.get
    +    if (parentCarbonTable.hasAggregationDataMap) {
    +      // used as a flag to block direct drop partition on aggregate tables fired by the user
    +      operationContext.setProperty("isInternalDropCall", "true")
    +      // Filter out all the tables which dont have the partition being dropped.
    +      val childTablesWithoutPartitionColumns =
    +        parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
    +          val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
    +          val partitionColExists = partitionsToBeDropped.forall {
    +            partition =>
    +              childColumns.exists { childColumn =>
    +                  childColumn.getAggFunction.isEmpty &&
    +                  childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
    +                    equals(partition)
    +              }
    +          }
    +          !partitionColExists
    +      }
    +      if (childTablesWithoutPartitionColumns.nonEmpty) {
    +        throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
    +                                           s" participating in the following datamaps ${
    +          childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
    --- End diff --
    
    Verified


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178218263
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -149,7 +143,16 @@ case class CarbonLoadDataCommand(
         val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
         carbonProperty.addProperty("zookeeper.enable.lock", "false")
    -
    +    currPartitions = if (table.isHivePartitionTable) {
    --- End diff --
    
    This is a metastore call, not supposed keep under processData


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4763/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3512/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801685
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -62,8 +63,25 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val parentPartitionColumns = if (parentTable.isHivePartitionTable) {
    +      partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +    } else {
    +      Seq()
    +    }
    +    // Generate child table partition columns in the same order as the parent table.
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if parentPartitionColumns
    +        .exists(parentCol =>
    +          parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +        (PartitionerField(field.name.get,
    +          field.dataType,
    +          field.columnComment), parentPartitionColumns
    +          .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
    +    }.toSeq.sortBy(_._2).map(_._1)
    --- End diff --
    
    removed sortBy


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4267/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3463/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3565/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178219637
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    --- End diff --
    
    just use parentTable.isHivePartitionTable


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4244/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178727711
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,488 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    +
    +  val testData = s"$resourcesPath/sample.csv"
    +
    +  override def beforeAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("create database partition_preaggregate")
    +    sql("use partition_preaggregate")
    +    sql(
    +      """
    +        | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(
    +      """
    +        | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +  }
    +
    +  override def afterAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("use default")
    +  }
    +
    +  // Create aggregate table on partition with partition column in aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as aggregation") {
    +    sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column in projection and aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as projection") {
    +    sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column as group by.
    +  test("test preaggregate table creation on partition table with partition col as group by") {
    +    sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition without partition column.
    +  test("test preaggregate table creation on partition table without partition column") {
    +    sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +  
    +  test("test data correction in aggregate table when partition column is used") {
    +    sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age")
    +    checkAnswer(sql("select * from maintable_p1"),
    +      Seq(Row(1,31,31),
    +        Row(2,27,27),
    +        Row(3,70,35),
    +        Row(4,26,26),
    +        Row(4,29,29)))
    +    preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1")
    +    sql("drop datamap p1 on table maintable")
    +  }
    +
    +  test("test data correction in aggregate table when partition column is not used") {
    +    sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id")
    +    checkAnswer(sql("select * from maintable_p2"),
    +      Seq(Row(1,31),
    +        Row(2,27),
    +        Row(3,35),
    +        Row(4,29)))
    +    preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2")
    +    sql("drop datamap p2 on table maintable")
    +  }
    +
    +  test("test data correction with insert overwrite") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2014,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
    +    checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1)))
    +  }
    +
    +  test("test data correction with insert overwrite on different value") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2015,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1)))
    --- End diff --
    
    after insert overwrite operation only one row should come (Row("v",2015,1,1))....Is my understanding correct?


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801779
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    --- End diff --
    
    Added TODO


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4169/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3465/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3459/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4688/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801803
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    --- End diff --
    
    done


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3439/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4690/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4266/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3533/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4194/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4269/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178780592
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    --- End diff --
    
    Make it as info or error


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4199/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3532/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4196/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801858
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand(
             table.getDatabaseName,
             table.getTableName,
             locksToBeAcquired)(sparkSession)
    +      // If flow is for child table then get the uuid from operation context.
    +      // If flow is for parent table then generate uuid for child flows and set the uuid to ""
    +      // for parent table
    +      // If normal table then set uuid to "".
    +      val uuid = if (table.isChildDataMap) {
    +        Option(operationContext.getProperty("uuid")).getOrElse("").toString
    --- End diff --
    
    Logged the message


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178795973
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,488 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import scala.collection.JavaConverters._
    +
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    +
    +  val testData = s"$resourcesPath/sample.csv"
    +
    +  override def beforeAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("create database partition_preaggregate")
    +    sql("use partition_preaggregate")
    +    sql(
    +      """
    +        | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(
    +      """
    +        | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +  }
    +
    +  override def afterAll(): Unit = {
    +    sql("drop database if exists partition_preaggregate cascade")
    +    sql("use default")
    +  }
    +
    +  // Create aggregate table on partition with partition column in aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as aggregation") {
    +    sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column in projection and aggregation only.
    +  test("test preaggregate table creation on partition table with partition col as projection") {
    +    sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition with partition column as group by.
    +  test("test preaggregate table creation on partition table with partition col as group by") {
    +    sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ")
    +    assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +
    +  // Create aggregate table on partition without partition column.
    +  test("test preaggregate table creation on partition table without partition column") {
    +    sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ")
    +    assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable)
    +  }
    +  
    +  test("test data correction in aggregate table when partition column is used") {
    +    sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age")
    +    checkAnswer(sql("select * from maintable_p1"),
    +      Seq(Row(1,31,31),
    +        Row(2,27,27),
    +        Row(3,70,35),
    +        Row(4,26,26),
    +        Row(4,29,29)))
    +    preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1")
    +    sql("drop datamap p1 on table maintable")
    +  }
    +
    +  test("test data correction in aggregate table when partition column is not used") {
    +    sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id")
    +    checkAnswer(sql("select * from maintable_p2"),
    +      Seq(Row(1,31),
    +        Row(2,27),
    +        Row(3,35),
    +        Row(4,29)))
    +    preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2")
    +    sql("drop datamap p2 on table maintable")
    +  }
    +
    +  test("test data correction with insert overwrite") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2014,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
    +    checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1)))
    +  }
    +
    +  test("test data correction with insert overwrite on different value") {
    +    sql("drop table if exists partitionone")
    +    sql(
    +      """
    +        | CREATE TABLE if not exists partitionone (empname String)
    +        | PARTITIONED BY (year int, month int,day int)
    +        | STORED BY 'org.apache.carbondata.format'
    +      """.stripMargin)
    +    sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
    +    sql("insert into partitionone values('k',2014,1,1)")
    +    sql("insert overwrite table partitionone values('v',2015,1,1)")
    +    checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1)))
    --- End diff --
    
    In case of partition, overwrite would on per partition level not on table level. So on the matching partitions would be overwritten.


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178239706
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    --- End diff --
    
    done


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3608/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3535/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3423/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4243/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3422/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178223172
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if childPartitionColumns
    +        .exists(childCol =>
    +          childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +      PartitionerField(field.name.get, field.dataType, field.columnComment)
    +    }.toSeq
    --- End diff --
    
    PLease add some tests to make sure the order of partition columns are same as parent partition columns


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178222428
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    --- End diff --
    
    Why again require to extract the columns? Can't you get from fieldRelationMap


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4685/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801747
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    --- End diff --
    
    done



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4291/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3509/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4656/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3528/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178222846
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if childPartitionColumns
    +        .exists(childCol =>
    +          childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +      PartitionerField(field.name.get, field.dataType, field.columnComment)
    +    }.toSeq
    --- End diff --
    
    The order of partition columns is important. Please make sure that parent partition column order and child partition column order is same.


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4756/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178767625
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    --- End diff --
    
    Write a comment to explain when uuid will be empty and when it will be non empty


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4803/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801883
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         var locks = List.empty[ICarbonLock]
         val uniqueId = System.currentTimeMillis().toString
    +    val childCommands = operationContext.getProperty("dropPartitionCommands")
    --- End diff --
    
    done


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4789/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3600/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4302/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4200/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178801930
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---
    @@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with
         segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
       }
     
    +  test("test auto compaction on aggregate table") {
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
    +    val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
    +    segmentNamesSum.sorted should equal  (Array("0", "0.1", "1", "2", "3"))
    +    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
    --- End diff --
    
    added in afterAll


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4736/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4160/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    retest this please


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178217187
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,440 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import java.io.{File, FileWriter}
    +
    +import org.apache.commons.io.FileUtils
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    --- End diff --
    
    Please handle and add overwrite test cases for partition with pre-aggregate scenario


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3461/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    retest sdv please


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4303/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3584/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4686/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3458/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4261/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4177/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178239696
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    --- End diff --
    
    removed this method call


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178237981
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
    @@ -149,7 +143,16 @@ case class CarbonLoadDataCommand(
         val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
         val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
         carbonProperty.addProperty("zookeeper.enable.lock", "false")
    -
    +    currPartitions = if (table.isHivePartitionTable) {
    --- End diff --
    
    Actually i had to move getPartitions to processData because when this method is called for child tables then we are expecting validSegments to be set. In processMeta they are not set. I have passed carbonTable to avoid metastore call. Should not cause problems.c


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4831/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4665/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178264753
  
    --- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
    @@ -0,0 +1,440 @@
    +/*
    +* Licensed to the Apache Software Foundation (ASF) under one or more
    +* contributor license agreements.  See the NOTICE file distributed with
    +* this work for additional information regarding copyright ownership.
    +* The ASF licenses this file to You under the Apache License, Version 2.0
    +* (the "License"); you may not use this file except in compliance with
    +* the License.  You may obtain id copy of the License at
    +*
    +*    http://www.apache.org/licenses/LICENSE-2.0
    +*
    +* Unless required by applicable law or agreed to in writing, software
    +* distributed under the License is distributed on an "AS IS" BASIS,
    +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +* See the License for the specific language governing permissions and
    +* limitations under the License.
    +*/
    +
    +package org.apache.carbondata.spark.testsuite.standardpartition
    +
    +import java.io.{File, FileWriter}
    +
    +import org.apache.commons.io.FileUtils
    +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
    +import org.apache.spark.sql.execution.datasources.LogicalRelation
    +import org.apache.spark.sql.hive.CarbonRelation
    +import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
    +import org.apache.spark.sql.test.util.QueryTest
    +import org.scalatest.BeforeAndAfterAll
    +
    +class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
    --- End diff --
    
    added insert overwrite test case


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4760/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178264718
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    +    val partitionerFields = fieldRelationMap.collect {
    +      case (field, dataMapField) if childPartitionColumns
    +        .exists(childCol =>
    +          childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
    +          dataMapField.aggregateFunction.isEmpty) =>
    +      PartitionerField(field.name.get, field.dataType, field.columnComment)
    +    }.toSeq
    --- End diff --
    
    done


---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4649/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4264/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4265/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178238607
  
    --- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
    @@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
       override def executeCompaction(): Unit = {
         val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
         val loadMetaDataDetails = identifySegmentsToBeMerged()
    -    val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
    +    // If segmentFile name is specified in load details then segment is for partition table
    +    // therefore the segment file name should be loadName#segmentFileName.segment
    +    val segments = loadMetaDataDetails.asScala.map {
    +      loadDetail => if (loadDetail.getSegmentFile != null) {
    +        loadDetail.getLoadName + "#" + loadDetail.getSegmentFile
    +      } else {
    +        loadDetail.getLoadName
    +      }
    --- End diff --
    
    ok


---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4648/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4127/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4762/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178758267
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
       override def processData(sparkSession: SparkSession): Seq[Row] = {
         var locks = List.empty[ICarbonLock]
         val uniqueId = System.currentTimeMillis().toString
    +    val childCommands = operationContext.getProperty("dropPartitionCommands")
    --- End diff --
    
    Remove as unused


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3462/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3536/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4623/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4808/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178222584
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
    @@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
         val df = sparkSession.sql(updatedQuery)
         val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
           df.logicalPlan, queryString)
    +    val partitionInfo = parentTable.getPartitionInfo
         val fields = fieldRelationMap.keySet.toSeq
         val tableProperties = mutable.Map[String, String]()
    +    val childPartitionColumns = if (partitionInfo != null &&
    +                              partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
    +      val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
    +      PreAggregateUtil
    +        .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
    +    } else {Seq()}
    --- End diff --
    
    format it properly


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4764/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178719497
  
    --- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
    @@ -130,15 +130,20 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws
               loadModel.getTablePath());
           newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
         }
    +    OperationContext operationContext = (OperationContext) getOperationContext();
    +    String uuid = "";
    +    if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() &&
    +        operationContext != null) {
    +      uuid = operationContext.getProperty("uuid").toString();
    +    }
         CarbonLoaderUtil
             .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
                 true);
         CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
         long segmentSize = CarbonLoaderUtil
             .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
         if (segmentSize > 0 || overwriteSet) {
    -      Object operationContext = getOperationContext();
    -      if (operationContext != null) {
    +      if (operationContext != null && carbonTable.hasAggregationDataMap()) {
             ((OperationContext) operationContext)
    --- End diff --
    
    Remove typecasting again


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178761202
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand(
             table.getDatabaseName,
             table.getTableName,
             locksToBeAcquired)(sparkSession)
    +      // If flow is for child table then get the uuid from operation context.
    +      // If flow is for parent table then generate uuid for child flows and set the uuid to ""
    +      // for parent table
    +      // If normal table then set uuid to "".
    +      val uuid = if (table.isChildDataMap) {
    +        Option(operationContext.getProperty("uuid")).getOrElse("").toString
    --- End diff --
    
    In case of isChildDataMap condition it is mandatory to get UUID set by parent table. If not set then log the message here or better to throw the exception from here


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4268/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178784636
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
    @@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
     import org.apache.carbondata.events._
     import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
     
    +object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
    +    val carbonTable = preStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
    +    }
    +  }
    +}
    +
    +trait CommitHelper {
    +
    +  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
    +
    +  protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
    +      operationContext: OperationContext,
    +      carbonTable: CarbonTable): Unit = {
    +    val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
    +    val segmentBeingLoaded =
    +      operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
    +    val newDetails = loadMetaDataDetails.collect {
    +      case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
    +        detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
    +        detail
    +      case others => others
    +    }
    +    SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
    +  }
    +
    +  /**
    +   *  Used to rename table status files for commit operation.
    +   */
    +  protected def renameDataMapTableStatusFiles(sourceFileName: String,
    +      destinationFileName: String, uuid: String): Boolean = {
    +    val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
    +    val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
    +    if (oldCarbonFile.exists() && newCarbonFile.exists()) {
    +      val backUpPostFix = if (uuid.nonEmpty) {
    +        "_backup_" + uuid
    +      } else {
    +        ""
    +      }
    +      LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
    +      if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
    +        LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
    +        oldCarbonFile.renameForce(destinationFileName)
    +      } else {
    +        LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
    +        false
    +      }
    +    } else {
    +      false
    +    }
    +  }
    +
    +  /**
    +   * Used to remove table status files with UUID and segment folders.
    +   */
    +  protected def cleanUpStaleTableStatusFiles(
    +      childTables: Seq[CarbonTable],
    +      operationContext: OperationContext,
    +      uuid: String): Unit = {
    +    childTables.foreach { childTable =>
    +      val metaDataDir = FileFactory.getCarbonFile(
    +        CarbonTablePath.getMetadataPath(childTable.getTablePath))
    +      val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
    +        override def accept(file: CarbonFile): Boolean = {
    +          file.getName.contains(uuid) || file.getName.contains("backup")
    +        }
    +      })
    +      tableStatusFiles.foreach(_.delete())
    +    }
    +  }
    +}
    +
    +object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
    +    val carbonTable = postStatusListener.carbonTable
    +    val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
    +    val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
    +    if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
    +      val childCommands =
    +        childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
    +      val renamedDataMaps = childCommands.takeWhile {
    +        childCommand =>
    +          val childCarbonTable = childCommand.table
    +          val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
    +            childCarbonTable.getTablePath, uuid)
    +          // Generate table status file name without UUID, forExample: tablestatus
    +          val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
    +            childCarbonTable.getTablePath)
    +          renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
    +      }
    +      // if true then the commit for one of the child tables has failed
    +      val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
    +      if (commitFailed) {
    +        LOGGER.warn("Reverting table status file to original state")
    +        renamedDataMaps.foreach {
    +          command =>
    +            val carbonTable = command.table
    +            // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
    +            val backupTableSchemaPath =
    +              CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
    +            val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
    +            renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
    +        }
    +      }
    +      // after success/failure of commit delete all tablestatus files with UUID in their names.
    +      // if commit failed then remove the segment directory
    +      cleanUpStaleTableStatusFiles(childCommands.map(_.table),
    +        operationContext,
    +        uuid)
    +      if (commitFailed) {
    +        sys.error("Failed to update table status for pre-aggregate table")
    +      }
    +
    +      }
    +    }
    +}
    +
    +object AlterTableDropPartitionMetaListener extends OperationEventListener{
    +  /**
    +   * Called on a specified event occurrence
    +   *
    +   * @param event
    +   * @param operationContext
    +   */
    +  override protected def onEvent(event: Event,
    +      operationContext: OperationContext) = {
    +    val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
    +    val parentCarbonTable = dropPartitionEvent.parentCarbonTable
    +    val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
    +    val sparkSession = SparkSession.getActiveSession.get
    +    if (parentCarbonTable.hasAggregationDataMap) {
    +      // used as a flag to block direct drop partition on aggregate tables fired by the user
    +      operationContext.setProperty("isInternalDropCall", "true")
    +      // Filter out all the tables which dont have the partition being dropped.
    +      val childTablesWithoutPartitionColumns =
    +        parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
    +          val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
    +          val partitionColExists = partitionsToBeDropped.forall {
    +            partition =>
    +              childColumns.exists { childColumn =>
    +                  childColumn.getAggFunction.isEmpty &&
    +                  childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
    +                    equals(partition)
    +              }
    +          }
    +          !partitionColExists
    +      }
    +      if (childTablesWithoutPartitionColumns.nonEmpty) {
    +        throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
    +                                           s" participating in the following datamaps ${
    +          childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
    --- End diff --
    
    Make sure this list is printed to console output


---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3534/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3431/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed  with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4739/



---

[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3429/



---

[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support

Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:

    https://github.com/apache/carbondata/pull/2109
  
    Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3398/



---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:

    https://github.com/apache/carbondata/pull/2109#discussion_r178237773
  
    --- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
    @@ -18,6 +18,7 @@
     package org.apache.spark.sql.execution.command.partition
     
     import java.util
    +import java.util.UUID
    --- End diff --
    
    Not allowed


---

[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/carbondata/pull/2109


---