You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by kunal642 <gi...@git.apache.org> on 2018/03/27 22:42:21 UTC
[GitHub] carbondata pull request #2109: [WIP] Partition preaggregate support
GitHub user kunal642 opened a pull request:
https://github.com/apache/carbondata/pull/2109
[WIP] Partition preaggregate support
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
- [ ] Any interfaces changed?
- [ ] Any backward compatibility impacted?
- [ ] Document update required?
- [ ] Testing done
Please provide details on
- Whether new unit test cases have been added or why no new tests are required?
- How it is tested? Please attach test report.
- Is it a performance related change? Please attach the performance test report.
- Any additional information to help reviewers in testing this change.
- [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/kunal642/carbondata partition_preaggregate_support
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/2109.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2109
----
----
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/2109
retest sdv please
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4823/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178219012
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command.partition
import java.util
+import java.util.UUID
--- End diff --
Do you allow dropping of partitions directly on aggregate table? I mean not through parent table.
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4168/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178775449
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ command =>
+ val carbonTable = command.table
+ // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+ val backupTableSchemaPath =
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
+ val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+ renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+ }
+ }
+ // after success/failure of commit delete all tablestatus files with UUID in their names.
+ // if commit failed then remove the segment directory
+ cleanUpStaleTableStatusFiles(childCommands.map(_.table),
--- End diff --
1. Make this call in finally block
2. We need to have a mechanism for clean up during clean operation. **For this we can raise a jira to track the issue**
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3580/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178719972
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---
@@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with
segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
}
+ test("test auto compaction on aggregate table") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3"))
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
--- End diff --
1. Wrap the test case in try/finally...In case of any failure the property need to be set to default value again else proceeding test cases might fail
2. Replace false with default value from CarbonCommonConstants
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4689/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4158/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4658/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4197/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178217935
--- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
@@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
override def executeCompaction(): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val loadMetaDataDetails = identifySegmentsToBeMerged()
- val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
+ // If segmentFile name is specified in load details then segment is for partition table
+ // therefore the segment file name should be loadName#segmentFileName.segment
+ val segments = loadMetaDataDetails.asScala.map {
+ loadDetail => if (loadDetail.getSegmentFile != null) {
+ loadDetail.getLoadName + "#" + loadDetail.getSegmentFile
+ } else {
+ loadDetail.getLoadName
+ }
--- End diff --
Just use new Segment(loadDetail.getLoadName, loadDetail.getSegmentFile).toString instead of doing this
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4692/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178787300
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -62,8 +63,25 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val parentPartitionColumns = if (parentTable.isHivePartitionTable) {
+ partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ } else {
+ Seq()
+ }
+ // Generate child table partition columns in the same order as the parent table.
+ val partitionerFields = fieldRelationMap.collect {
+ case (field, dataMapField) if parentPartitionColumns
+ .exists(parentCol =>
+ parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
+ dataMapField.aggregateFunction.isEmpty) =>
+ (PartitionerField(field.name.get,
+ field.dataType,
+ field.columnComment), parentPartitionColumns
+ .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
+ }.toSeq.sortBy(_._2).map(_._1)
--- End diff --
I think sortBy and map operation is not required here as already the child datamap ordering is according to the parent table partition column ordering
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4259/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4761/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4193/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801725
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ command =>
+ val carbonTable = command.table
+ // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+ val backupTableSchemaPath =
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
+ val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+ renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+ }
+ }
+ // after success/failure of commit delete all tablestatus files with UUID in their names.
+ // if commit failed then remove the segment directory
+ cleanUpStaleTableStatusFiles(childCommands.map(_.table),
+ operationContext,
+ uuid)
+ if (commitFailed) {
+ sys.error("Failed to update table status for pre-aggregate table")
+ }
+
+ }
+ }
+}
+
+object AlterTableDropPartitionMetaListener extends OperationEventListener{
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
+ val parentCarbonTable = dropPartitionEvent.parentCarbonTable
+ val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
+ val sparkSession = SparkSession.getActiveSession.get
+ if (parentCarbonTable.hasAggregationDataMap) {
+ // used as a flag to block direct drop partition on aggregate tables fired by the user
+ operationContext.setProperty("isInternalDropCall", "true")
+ // Filter out all the tables which dont have the partition being dropped.
+ val childTablesWithoutPartitionColumns =
+ parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
+ val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
+ val partitionColExists = partitionsToBeDropped.forall {
+ partition =>
+ childColumns.exists { childColumn =>
+ childColumn.getAggFunction.isEmpty &&
+ childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
+ equals(partition)
+ }
+ }
+ !partitionColExists
+ }
+ if (childTablesWithoutPartitionColumns.nonEmpty) {
+ throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
+ s" participating in the following datamaps ${
+ childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
--- End diff --
Verified
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178218263
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
@@ -149,7 +143,16 @@ case class CarbonLoadDataCommand(
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
-
+ currPartitions = if (table.isHivePartitionTable) {
--- End diff --
This is a metastore call, not supposed keep under processData
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4763/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3512/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801685
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -62,8 +63,25 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val parentPartitionColumns = if (parentTable.isHivePartitionTable) {
+ partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ } else {
+ Seq()
+ }
+ // Generate child table partition columns in the same order as the parent table.
+ val partitionerFields = fieldRelationMap.collect {
+ case (field, dataMapField) if parentPartitionColumns
+ .exists(parentCol =>
+ parentCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
+ dataMapField.aggregateFunction.isEmpty) =>
+ (PartitionerField(field.name.get,
+ field.dataType,
+ field.columnComment), parentPartitionColumns
+ .indexOf(dataMapField.columnTableRelationList.get.head.parentColumnName))
+ }.toSeq.sortBy(_._2).map(_._1)
--- End diff --
removed sortBy
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4267/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3463/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3565/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178219637
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
--- End diff --
just use parentTable.isHivePartitionTable
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4244/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178727711
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
@@ -0,0 +1,488 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain id copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeAll(): Unit = {
+ sql("drop database if exists partition_preaggregate cascade")
+ sql("create database partition_preaggregate")
+ sql("use partition_preaggregate")
+ sql(
+ """
+ | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop database if exists partition_preaggregate cascade")
+ sql("use default")
+ }
+
+ // Create aggregate table on partition with partition column in aggregation only.
+ test("test preaggregate table creation on partition table with partition col as aggregation") {
+ sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id")
+ assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition with partition column in projection and aggregation only.
+ test("test preaggregate table creation on partition table with partition col as projection") {
+ sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ")
+ assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition with partition column as group by.
+ test("test preaggregate table creation on partition table with partition col as group by") {
+ sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ")
+ assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition without partition column.
+ test("test preaggregate table creation on partition table without partition column") {
+ sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ")
+ assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ test("test data correction in aggregate table when partition column is used") {
+ sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age")
+ checkAnswer(sql("select * from maintable_p1"),
+ Seq(Row(1,31,31),
+ Row(2,27,27),
+ Row(3,70,35),
+ Row(4,26,26),
+ Row(4,29,29)))
+ preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1")
+ sql("drop datamap p1 on table maintable")
+ }
+
+ test("test data correction in aggregate table when partition column is not used") {
+ sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id")
+ checkAnswer(sql("select * from maintable_p2"),
+ Seq(Row(1,31),
+ Row(2,27),
+ Row(3,35),
+ Row(4,29)))
+ preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2")
+ sql("drop datamap p2 on table maintable")
+ }
+
+ test("test data correction with insert overwrite") {
+ sql("drop table if exists partitionone")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionone (empname String)
+ | PARTITIONED BY (year int, month int,day int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
+ sql("insert into partitionone values('k',2014,1,1)")
+ sql("insert overwrite table partitionone values('v',2014,1,1)")
+ checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
+ checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1)))
+ }
+
+ test("test data correction with insert overwrite on different value") {
+ sql("drop table if exists partitionone")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionone (empname String)
+ | PARTITIONED BY (year int, month int,day int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
+ sql("insert into partitionone values('k',2014,1,1)")
+ sql("insert overwrite table partitionone values('v',2015,1,1)")
+ checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1)))
--- End diff --
after insert overwrite operation only one row should come (Row("v",2015,1,1))....Is my understanding correct?
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801779
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ command =>
+ val carbonTable = command.table
+ // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+ val backupTableSchemaPath =
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
+ val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+ renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+ }
+ }
+ // after success/failure of commit delete all tablestatus files with UUID in their names.
+ // if commit failed then remove the segment directory
+ cleanUpStaleTableStatusFiles(childCommands.map(_.table),
--- End diff --
Added TODO
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4169/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3465/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3459/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4688/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801803
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
--- End diff --
done
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3439/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4690/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4266/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3533/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4194/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4269/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178780592
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
--- End diff --
Make it as info or error
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4199/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3532/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4196/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801858
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
@@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand(
table.getDatabaseName,
table.getTableName,
locksToBeAcquired)(sparkSession)
+ // If flow is for child table then get the uuid from operation context.
+ // If flow is for parent table then generate uuid for child flows and set the uuid to ""
+ // for parent table
+ // If normal table then set uuid to "".
+ val uuid = if (table.isChildDataMap) {
+ Option(operationContext.getProperty("uuid")).getOrElse("").toString
--- End diff --
Logged the message
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178795973
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
@@ -0,0 +1,488 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain id copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeAll(): Unit = {
+ sql("drop database if exists partition_preaggregate cascade")
+ sql("create database partition_preaggregate")
+ sql("use partition_preaggregate")
+ sql(
+ """
+ | CREATE TABLE par(id INT, name STRING, age INT) PARTITIONED BY(city STRING)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string) partitioned by (age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop database if exists partition_preaggregate cascade")
+ sql("use default")
+ }
+
+ // Create aggregate table on partition with partition column in aggregation only.
+ test("test preaggregate table creation on partition table with partition col as aggregation") {
+ sql("create datamap p1 on table par using 'preaggregate' as select id, sum(city) from par group by id")
+ assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p1")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition with partition column in projection and aggregation only.
+ test("test preaggregate table creation on partition table with partition col as projection") {
+ sql("create datamap p2 on table par using 'preaggregate' as select id, city, min(city) from par group by id,city ")
+ assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p2")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition with partition column as group by.
+ test("test preaggregate table creation on partition table with partition col as group by") {
+ sql("create datamap p3 on table par using 'preaggregate' as select id, max(city) from par group by id,city ")
+ assert(CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p3")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ // Create aggregate table on partition without partition column.
+ test("test preaggregate table creation on partition table without partition column") {
+ sql("create datamap p4 on table par using 'preaggregate' as select name, count(id) from par group by name ")
+ assert(!CarbonEnv.getCarbonTable(Some("partition_preaggregate"), "par_p4")(sqlContext.sparkSession).isHivePartitionTable)
+ }
+
+ test("test data correction in aggregate table when partition column is used") {
+ sql("create datamap p1 on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id, age")
+ checkAnswer(sql("select * from maintable_p1"),
+ Seq(Row(1,31,31),
+ Row(2,27,27),
+ Row(3,70,35),
+ Row(4,26,26),
+ Row(4,29,29)))
+ preAggTableValidator(sql("select id, sum(age) from maintable group by id, age").queryExecution.analyzed, "maintable_p1")
+ sql("drop datamap p1 on table maintable")
+ }
+
+ test("test data correction in aggregate table when partition column is not used") {
+ sql("create datamap p2 on table maintable using 'preaggregate' as select id, max(age) from maintable group by id")
+ checkAnswer(sql("select * from maintable_p2"),
+ Seq(Row(1,31),
+ Row(2,27),
+ Row(3,35),
+ Row(4,29)))
+ preAggTableValidator(sql("select id, max(age) from maintable group by id").queryExecution.analyzed, "maintable_p2")
+ sql("drop datamap p2 on table maintable")
+ }
+
+ test("test data correction with insert overwrite") {
+ sql("drop table if exists partitionone")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionone (empname String)
+ | PARTITIONED BY (year int, month int,day int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
+ sql("insert into partitionone values('k',2014,1,1)")
+ sql("insert overwrite table partitionone values('v',2014,1,1)")
+ checkAnswer(sql("select * from partitionone"), Seq(Row("v",2014,1,1)))
+ checkAnswer(sql("select * from partitionone_p1"), Seq(Row("v",2014,2014,1,1)))
+ }
+
+ test("test data correction with insert overwrite on different value") {
+ sql("drop table if exists partitionone")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionone (empname String)
+ | PARTITIONED BY (year int, month int,day int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql("create datamap p1 on table partitionone using 'preaggregate' as select empname, sum(year) from partitionone group by empname, year, month,day")
+ sql("insert into partitionone values('k',2014,1,1)")
+ sql("insert overwrite table partitionone values('v',2015,1,1)")
+ checkAnswer(sql("select * from partitionone"), Seq(Row("k",2014,1,1), Row("v",2015,1,1)))
--- End diff --
In case of partition, overwrite would on per partition level not on table level. So on the matching partitions would be overwritten.
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178239706
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
+ } else {Seq()}
--- End diff --
done
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3608/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3535/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3423/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4243/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3422/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178223172
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
+ } else {Seq()}
+ val partitionerFields = fieldRelationMap.collect {
+ case (field, dataMapField) if childPartitionColumns
+ .exists(childCol =>
+ childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
+ dataMapField.aggregateFunction.isEmpty) =>
+ PartitionerField(field.name.get, field.dataType, field.columnComment)
+ }.toSeq
--- End diff --
PLease add some tests to make sure the order of partition columns are same as parent partition columns
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178222428
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
--- End diff --
Why again require to extract the columns? Can't you get from fieldRelationMap
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4685/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801747
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
--- End diff --
done
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4291/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3509/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4656/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3528/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178222846
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
+ } else {Seq()}
+ val partitionerFields = fieldRelationMap.collect {
+ case (field, dataMapField) if childPartitionColumns
+ .exists(childCol =>
+ childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
+ dataMapField.aggregateFunction.isEmpty) =>
+ PartitionerField(field.name.get, field.dataType, field.columnComment)
+ }.toSeq
--- End diff --
The order of partition columns is important. Please make sure that parent partition column order and child partition column order is same.
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4756/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178767625
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
--- End diff --
Write a comment to explain when uuid will be empty and when it will be non empty
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4803/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801883
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
@@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
var locks = List.empty[ICarbonLock]
val uniqueId = System.currentTimeMillis().toString
+ val childCommands = operationContext.getProperty("dropPartitionCommands")
--- End diff --
done
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4789/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3600/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4302/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4200/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178801930
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala ---
@@ -176,6 +179,17 @@ class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with
segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
}
+ test("test auto compaction on aggregate table") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3"))
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
--- End diff --
added in afterAll
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4736/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4160/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on the issue:
https://github.com/apache/carbondata/pull/2109
retest this please
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178217187
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
@@ -0,0 +1,440 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain id copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import java.io.{File, FileWriter}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
--- End diff --
Please handle and add overwrite test cases for partition with pre-aggregate scenario
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3461/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on the issue:
https://github.com/apache/carbondata/pull/2109
retest sdv please
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4303/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3584/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4686/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3458/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4261/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4177/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178239696
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
--- End diff --
removed this method call
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178237981
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---
@@ -149,7 +143,16 @@ case class CarbonLoadDataCommand(
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
-
+ currPartitions = if (table.isHivePartitionTable) {
--- End diff --
Actually i had to move getPartitions to processData because when this method is called for child tables then we are expecting validSegments to be set. In processMeta they are not set. I have passed carbonTable to avoid metastore call. Should not cause problems.c
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4831/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4665/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178264753
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala ---
@@ -0,0 +1,440 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain id copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import java.io.{File, FileWriter}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
--- End diff --
added insert overwrite test case
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4760/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178264718
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
+ } else {Seq()}
+ val partitionerFields = fieldRelationMap.collect {
+ case (field, dataMapField) if childPartitionColumns
+ .exists(childCol =>
+ childCol.equals(dataMapField.columnTableRelationList.get.head.parentColumnName) &&
+ dataMapField.aggregateFunction.isEmpty) =>
+ PartitionerField(field.name.get, field.dataType, field.columnComment)
+ }.toSeq
--- End diff --
done
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4649/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4264/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4265/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178238607
--- Diff: integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala ---
@@ -47,7 +47,15 @@ class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
override def executeCompaction(): Unit = {
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
val loadMetaDataDetails = identifySegmentsToBeMerged()
- val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
+ // If segmentFile name is specified in load details then segment is for partition table
+ // therefore the segment file name should be loadName#segmentFileName.segment
+ val segments = loadMetaDataDetails.asScala.map {
+ loadDetail => if (loadDetail.getSegmentFile != null) {
+ loadDetail.getLoadName + "#" + loadDetail.getSegmentFile
+ } else {
+ loadDetail.getLoadName
+ }
--- End diff --
ok
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4648/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4127/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4762/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178758267
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
@@ -136,6 +140,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
var locks = List.empty[ICarbonLock]
val uniqueId = System.currentTimeMillis().toString
+ val childCommands = operationContext.getProperty("dropPartitionCommands")
--- End diff --
Remove as unused
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3462/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3536/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4623/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4808/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178222584
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---
@@ -59,8 +60,22 @@ case class PreAggregateTableHelper(
val df = sparkSession.sql(updatedQuery)
val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
df.logicalPlan, queryString)
+ val partitionInfo = parentTable.getPartitionInfo
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
+ val childPartitionColumns = if (partitionInfo != null &&
+ partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+ val parentPartitionColumns = partitionInfo.getColumnSchemaList.asScala.map(_.getColumnName)
+ PreAggregateUtil
+ .extractPartitionInfoForAggregateTable(df.logicalPlan, parentPartitionColumns)
+ } else {Seq()}
--- End diff --
format it properly
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4764/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178719497
--- Diff: hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java ---
@@ -130,15 +130,20 @@ public CarbonOutputCommitter(Path outputPath, TaskAttemptContext context) throws
loadModel.getTablePath());
newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
}
+ OperationContext operationContext = (OperationContext) getOperationContext();
+ String uuid = "";
+ if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() &&
+ operationContext != null) {
+ uuid = operationContext.getProperty("uuid").toString();
+ }
CarbonLoaderUtil
.populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
true);
CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
long segmentSize = CarbonLoaderUtil
.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
if (segmentSize > 0 || overwriteSet) {
- Object operationContext = getOperationContext();
- if (operationContext != null) {
+ if (operationContext != null && carbonTable.hasAggregationDataMap()) {
((OperationContext) operationContext)
--- End diff --
Remove typecasting again
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178761202
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
@@ -147,6 +152,18 @@ case class CarbonAlterTableDropHivePartitionCommand(
table.getDatabaseName,
table.getTableName,
locksToBeAcquired)(sparkSession)
+ // If flow is for child table then get the uuid from operation context.
+ // If flow is for parent table then generate uuid for child flows and set the uuid to ""
+ // for parent table
+ // If normal table then set uuid to "".
+ val uuid = if (table.isChildDataMap) {
+ Option(operationContext.getProperty("uuid")).getOrElse("").toString
--- End diff --
In case of isChildDataMap condition it is mandatory to get UUID set by parent table. If not set then log the message here or better to throw the exception from here
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/2109
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/4268/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by manishgupta88 <gi...@git.apache.org>.
Github user manishgupta88 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178784636
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala ---
@@ -39,6 +41,212 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePostStatusUpdateEvent, LoadTablePreExecutionEvent, LoadTablePreStatusUpdateEvent}
+object AlterTableDropPartitionPreStatusListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val preStatusListener = event.asInstanceOf[AlterTableDropPartitionPreStatusEvent]
+ val carbonTable = preStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ childCommands.foreach(_.processData(SparkSession.getActiveSession.get))
+ }
+ }
+}
+
+trait CommitHelper {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
+ protected def markInProgressSegmentAsDeleted(tableStatusFile: String,
+ operationContext: OperationContext,
+ carbonTable: CarbonTable): Unit = {
+ val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(tableStatusFile)
+ val segmentBeingLoaded =
+ operationContext.getProperty(carbonTable.getTableUniqueName + "_Segment").toString
+ val newDetails = loadMetaDataDetails.collect {
+ case detail if detail.getLoadName.equalsIgnoreCase(segmentBeingLoaded) =>
+ detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
+ detail
+ case others => others
+ }
+ SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusFile, newDetails)
+ }
+
+ /**
+ * Used to rename table status files for commit operation.
+ */
+ protected def renameDataMapTableStatusFiles(sourceFileName: String,
+ destinationFileName: String, uuid: String): Boolean = {
+ val oldCarbonFile = FileFactory.getCarbonFile(sourceFileName)
+ val newCarbonFile = FileFactory.getCarbonFile(destinationFileName)
+ if (oldCarbonFile.exists() && newCarbonFile.exists()) {
+ val backUpPostFix = if (uuid.nonEmpty) {
+ "_backup_" + uuid
+ } else {
+ ""
+ }
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix}")
+ if (newCarbonFile.renameForce(destinationFileName + backUpPostFix)) {
+ LOGGER.info(s"Renaming $oldCarbonFile to $destinationFileName")
+ oldCarbonFile.renameForce(destinationFileName)
+ } else {
+ LOGGER.info(s"Renaming $newCarbonFile to ${destinationFileName + backUpPostFix} failed")
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+ /**
+ * Used to remove table status files with UUID and segment folders.
+ */
+ protected def cleanUpStaleTableStatusFiles(
+ childTables: Seq[CarbonTable],
+ operationContext: OperationContext,
+ uuid: String): Unit = {
+ childTables.foreach { childTable =>
+ val metaDataDir = FileFactory.getCarbonFile(
+ CarbonTablePath.getMetadataPath(childTable.getTablePath))
+ val tableStatusFiles = metaDataDir.listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.contains(uuid) || file.getName.contains("backup")
+ }
+ })
+ tableStatusFiles.foreach(_.delete())
+ }
+ }
+}
+
+object AlterTableDropPartitionPostStatusListener extends OperationEventListener with CommitHelper {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val postStatusListener = event.asInstanceOf[AlterTableDropPartitionPostStatusEvent]
+ val carbonTable = postStatusListener.carbonTable
+ val childDropPartitionCommands = operationContext.getProperty("dropPartitionCommands")
+ val uuid = Option(operationContext.getProperty("uuid")).getOrElse("").toString
+ if (childDropPartitionCommands != null && carbonTable.hasAggregationDataMap) {
+ val childCommands =
+ childDropPartitionCommands.asInstanceOf[Seq[CarbonAlterTableDropHivePartitionCommand]]
+ val renamedDataMaps = childCommands.takeWhile {
+ childCommand =>
+ val childCarbonTable = childCommand.table
+ val oldTableSchemaPath = CarbonTablePath.getTableStatusFilePathWithUUID(
+ childCarbonTable.getTablePath, uuid)
+ // Generate table status file name without UUID, forExample: tablestatus
+ val newTableSchemaPath = CarbonTablePath.getTableStatusFilePath(
+ childCarbonTable.getTablePath)
+ renameDataMapTableStatusFiles(oldTableSchemaPath, newTableSchemaPath, uuid)
+ }
+ // if true then the commit for one of the child tables has failed
+ val commitFailed = renamedDataMaps.lengthCompare(childCommands.length) != 0
+ if (commitFailed) {
+ LOGGER.warn("Reverting table status file to original state")
+ renamedDataMaps.foreach {
+ command =>
+ val carbonTable = command.table
+ // rename the backup tablestatus i.e tablestatus_backup_UUID to tablestatus
+ val backupTableSchemaPath =
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath) + "_backup_" + uuid
+ val tableSchemaPath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+ renameDataMapTableStatusFiles(backupTableSchemaPath, tableSchemaPath, "")
+ }
+ }
+ // after success/failure of commit delete all tablestatus files with UUID in their names.
+ // if commit failed then remove the segment directory
+ cleanUpStaleTableStatusFiles(childCommands.map(_.table),
+ operationContext,
+ uuid)
+ if (commitFailed) {
+ sys.error("Failed to update table status for pre-aggregate table")
+ }
+
+ }
+ }
+}
+
+object AlterTableDropPartitionMetaListener extends OperationEventListener{
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override protected def onEvent(event: Event,
+ operationContext: OperationContext) = {
+ val dropPartitionEvent = event.asInstanceOf[AlterTableDropPartitionMetaEvent]
+ val parentCarbonTable = dropPartitionEvent.parentCarbonTable
+ val partitionsToBeDropped = dropPartitionEvent.specs.flatMap(_.keys)
+ val sparkSession = SparkSession.getActiveSession.get
+ if (parentCarbonTable.hasAggregationDataMap) {
+ // used as a flag to block direct drop partition on aggregate tables fired by the user
+ operationContext.setProperty("isInternalDropCall", "true")
+ // Filter out all the tables which dont have the partition being dropped.
+ val childTablesWithoutPartitionColumns =
+ parentCarbonTable.getTableInfo.getDataMapSchemaList.asScala.filter { dataMapSchema =>
+ val childColumns = dataMapSchema.getChildSchema.getListOfColumns.asScala
+ val partitionColExists = partitionsToBeDropped.forall {
+ partition =>
+ childColumns.exists { childColumn =>
+ childColumn.getAggFunction.isEmpty &&
+ childColumn.getParentColumnTableRelations.asScala.head.getColumnName.
+ equals(partition)
+ }
+ }
+ !partitionColExists
+ }
+ if (childTablesWithoutPartitionColumns.nonEmpty) {
+ throw new MetadataProcessException(s"Cannot drop partition as one of the partition is not" +
+ s" participating in the following datamaps ${
+ childTablesWithoutPartitionColumns.toList.map(_.getChildSchema.getTableName)
--- End diff --
Make sure this list is printed to console output
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3534/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3431/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/4739/
---
[GitHub] carbondata issue #2109: [CARBONDATA-2294] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3429/
---
[GitHub] carbondata issue #2109: [WIP] Partition preaggregate support
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/2109
Build Failed with Spark 2.2.1, Please check CI http://88.99.58.216:8080/job/ApacheCarbonPRBuilder/3398/
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by kunal642 <gi...@git.apache.org>.
Github user kunal642 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/2109#discussion_r178237773
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala ---
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command.partition
import java.util
+import java.util.UUID
--- End diff --
Not allowed
---
[GitHub] carbondata pull request #2109: [CARBONDATA-2294] Partition preaggregate supp...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/2109
---