You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/07 15:51:35 UTC
[1/2] carbondata git commit: [CARBONDATA-1526] [PreAgg] Added support
to compact segments in pre-agg table
Repository: carbondata
Updated Branches:
refs/heads/master 6dcf4eb95 -> 2304303ca
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index c602b0a..851b851 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema, TableSchema}
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
@@ -541,4 +541,21 @@ object PreAggregateUtil {
}
}
+ def createChildSelectQuery(tableSchema: TableSchema, databaseName: String): String = {
+ val aggregateColumns = scala.collection.mutable.ArrayBuffer.empty[String]
+ val groupingExpressions = scala.collection.mutable.ArrayBuffer.empty[String]
+ tableSchema.getListOfColumns.asScala.foreach {
+ a => if (a.getAggFunction.nonEmpty) {
+ aggregateColumns += s"${a.getAggFunction match {
+ case "count" => "sum"
+ case _ => a.getAggFunction}}(${a.getColumnName})"
+ } else {
+ groupingExpressions += a.getColumnName
+ }
+ }
+ s"select ${ groupingExpressions.mkString(",") },${ aggregateColumns.mkString(",")
+ } from $databaseName.${ tableSchema.getTableName } group by ${
+ groupingExpressions.mkString(",") }"
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 3922e76..d7e9867 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -274,9 +274,16 @@ public final class CarbonLoaderUtil {
if (loadStartEntry) {
String segmentId =
String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
- newMetaEntry.setLoadName(segmentId);
loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
- loadModel.setSegmentId(segmentId);
+ // Segment id would be provided in case this is compaction flow for aggregate data map.
+ // If that is true then used the segment id as the load name.
+ if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !loadModel
+ .getSegmentId().isEmpty()) {
+ newMetaEntry.setLoadName(loadModel.getSegmentId());
+ } else {
+ newMetaEntry.setLoadName(segmentId);
+ loadModel.setSegmentId(segmentId);
+ }
// Exception should be thrown if:
// 1. If insert overwrite is in progress and any other load or insert operation
// is triggered
[2/2] carbondata git commit: [CARBONDATA-1526] [PreAgg] Added support
to compact segments in pre-agg table
Posted by ra...@apache.org.
[CARBONDATA-1526] [PreAgg] Added support to compact segments in pre-agg table
This PR will add to compact the pre-aggregate tables.
A pre-aggregate table can be compacted using the alter command i.e alter table table_name compact 'minor/major'.
If a table with some pre-aggregate table is compacted, then all the pre-aggregate tables are also compacted with the parent table
This closes #1605
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2304303c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2304303c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2304303c
Branch: refs/heads/master
Commit: 2304303ca4917b087159ae9888c8bddbb761b048
Parents: 6dcf4eb
Author: kunal642 <ku...@gmail.com>
Authored: Wed Nov 22 19:33:37 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Dec 7 21:20:16 2017 +0530
----------------------------------------------------------------------
.../TestPreAggregateCompaction.scala | 181 ++++++++++++++
.../spark/compaction/CompactionCallable.java | 44 ----
.../org/apache/carbondata/api/CarbonStore.scala | 6 +-
.../carbondata/events/AlterTableEvents.scala | 64 ++---
.../org/apache/carbondata/events/Events.scala | 4 +-
.../apache/carbondata/events/LoadEvents.scala | 8 +
.../apache/carbondata/spark/rdd/Compactor.scala | 167 -------------
.../spark/rdd/DataManagementFunc.scala | 225 ------------------
.../carbondata/spark/util/DataLoadingUtil.scala | 75 +++++-
.../spark/rdd/AggregateDataMapCompactor.scala | 118 +++++++++
.../spark/rdd/CarbonDataRDDFactory.scala | 34 +--
.../spark/rdd/CarbonTableCompactor.scala | 238 +++++++++++++++++++
.../spark/rdd/CompactionFactory.scala | 53 +++++
.../apache/carbondata/spark/rdd/Compactor.scala | 63 +++++
.../org/apache/spark/sql/CarbonSession.scala | 4 +-
.../management/CarbonLoadDataCommand.scala | 6 +-
.../preaaggregate/PreAggregateListeners.scala | 38 ++-
.../preaaggregate/PreAggregateUtil.scala | 19 +-
.../processing/util/CarbonLoaderUtil.java | 11 +-
19 files changed, 864 insertions(+), 494 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
new file mode 100644
index 0000000..89cf8eb
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateCompaction.scala
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.scalatest.Matchers._
+
+class TestPreAggregateCompaction extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeEach(): Unit = {
+ sql("drop database if exists compaction cascade")
+ sql("create database if not exists compaction")
+ sql("use compaction")
+ sql("create table testtable (id int, name string, city string, age int) STORED BY 'org.apache.carbondata.format'")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+ .stripMargin)
+ sql(
+ s"""create datamap preagg_avg on table maintable using 'preaggregate' as select id,avg(age) from maintable group by id"""
+ .stripMargin)
+ }
+
+ test("test if pre-agg table is compacted with parent table minor compaction") {
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("insert into testtable select * from maintable")
+ val sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+ sql("alter table maintable compact 'minor'")
+ val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+ checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+ val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+ segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
+ checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
+ }
+
+ test("test if pre-agg table is compacted with parent table major compaction") {
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable compact 'major'")
+ sql("insert into testtable select * from maintable")
+ val sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ val avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+ sql("alter table maintable compact 'minor'")
+ val segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+ checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+ val segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+ segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
+ checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
+ }
+
+ test("test if 2nd level minor compaction is successful for pre-agg table") {
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable compact 'minor'")
+ var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+ sql("insert into testtable select * from maintable")
+ var sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ var avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+ checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+ var segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+ segmentNamesAvg should equal (Array("3", "2", "1", "0.1", "0"))
+ checkAnswer(sql("select * from maintable_preagg_avg"), avgResult)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable compact 'minor'")
+ sql("insert overwrite table testtable select * from maintable")
+ sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+ segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
+ checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+ segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+ segmentNamesAvg.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
+ checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable compact 'minor'")
+ sql("insert overwrite table testtable select * from maintable")
+ sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ avgResult = sql("select id, sum(age), count(age) from testtable group by id").collect()
+ segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
+ checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+ segmentNamesAvg = sql("show segments for table maintable_preagg_avg").collect().map(_.get(0).toString)
+ segmentNamesAvg should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
+ checkAnswer(sql("select maintable_id, sum(maintable_age_sum), sum(maintable_age_count) from maintable_preagg_avg group by maintable_id"), avgResult)
+ }
+
+ test("test direct minor compaction on pre-agg tables") {
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable_preagg_sum compact 'minor'")
+ var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("3", "2", "1", "0.1", "0"))
+ sql("insert into testtable select * from maintable")
+ var sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ checkAnswer(sql("select * from maintable_preagg_sum"), sumResult)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable_preagg_sum compact 'minor'")
+ sql("insert overwrite table testtable select * from maintable")
+ sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum.sorted should equal (Array("0", "0.1", "1", "2", "3", "4", "4.1", "5", "6", "7"))
+ checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable_preagg_sum compact 'minor'")
+ sql("insert overwrite table testtable select * from maintable")
+ sumResult = sql("select id, sum(age) from testtable group by id").collect()
+ segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("11", "10", "9", "8.1", "8", "7", "6", "5", "4.1", "4", "3", "2", "1", "0.2", "0.1", "0"))
+ checkAnswer(sql("select maintable_id, sum(maintable_age_sum) from maintable_preagg_sum group by maintable_id"), sumResult)
+ }
+
+ test("test if minor/major compaction is successful for pre-agg table") {
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable_preagg_sum compact 'minor'")
+ var segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum should equal (Array("3","2","1","0.1", "0"))
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql("alter table maintable_preagg_sum compact 'major'")
+ segmentNamesSum = sql("show segments for table maintable_preagg_sum").collect().map(_.get(0).toString)
+ segmentNamesSum.sorted should equal (Array("0", "0.1", "0.2", "1", "2", "3", "4", "5", "6", "7"))
+ }
+
+ override def afterAll(): Unit = {
+ sql("drop database if exists compaction cascade")
+ sql("use default")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
deleted file mode 100644
index 2773eef..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/compaction/CompactionCallable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.compaction;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.spark.rdd.Compactor;
-
-import org.apache.spark.sql.execution.command.CompactionCallableModel;
-
-/**
- * Callable class which is used to trigger the compaction in a separate callable.
- */
-public class CompactionCallable implements Callable<Void> {
-
- private final CompactionCallableModel compactionCallableModel;
-
- public CompactionCallable(CompactionCallableModel compactionCallableModel) {
-
- this.compactionCallableModel = compactionCallableModel;
- }
-
- @Override public Void call() throws Exception {
-
- Compactor.triggerCompaction(compactionCallableModel);
- return null;
-
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index f4f569b..2b127e4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -22,9 +22,7 @@ import java.lang.Long
import scala.collection.JavaConverters._
import org.apache.spark.sql.Row
-import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.types.TimestampType
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.unsafe.types.UTF8String
@@ -37,7 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.DataManagementFunc
+import org.apache.carbondata.spark.util.DataLoadingUtil
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -127,7 +125,7 @@ object CarbonStore {
FileFactory.getCarbonFile(absIdent.getTablePath,
FileFactory.getFileType(absIdent.getTablePath)))
} else {
- DataManagementFunc.deleteLoadsAndUpdateMetadata(
+ DataLoadingUtil.deleteLoadsAndUpdateMetadata(
isForceDeletion = true, carbonTable)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index 1a0c305..7caad43 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -18,7 +18,7 @@ package org.apache.carbondata.events
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -29,7 +29,8 @@ import org.apache.carbondata.processing.loading.model.CarbonLoadModel
* @param alterTableDropColumnModel
* @param sparkSession
*/
-case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
+case class AlterTableDropColumnPreEvent(
+ carbonTable: CarbonTable,
alterTableDropColumnModel: AlterTableDropColumnModel,
sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
@@ -40,7 +41,9 @@ case class AlterTableDropColumnPreEvent(carbonTable: CarbonTable,
* @param carbonTable
* @param alterTableDataTypeChangeModel
*/
-case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableDataTypeChangePreEvent(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
extends Event with AlterTableDataTypeChangeEventInfo
@@ -50,7 +53,9 @@ case class AlterTableDataTypeChangePreEvent(sparkSession: SparkSession, carbonTa
* @param carbonTable
* @param alterTableDataTypeChangeModel
*/
-case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableDataTypeChangePostEvent(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
alterTableDataTypeChangeModel: AlterTableDataTypeChangeModel)
extends Event with AlterTableDataTypeChangeEventInfo
@@ -60,7 +65,8 @@ case class AlterTableDataTypeChangePostEvent(sparkSession: SparkSession, carbonT
* @param alterTableDropColumnModel
* @param sparkSession
*/
-case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable,
+case class AlterTableDropColumnPostEvent(
+ carbonTable: CarbonTable,
alterTableDropColumnModel: AlterTableDropColumnModel,
sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
@@ -71,7 +77,8 @@ case class AlterTableDropColumnPostEvent(carbonTable: CarbonTable,
* @param alterTableDropColumnModel
* @param sparkSession
*/
-case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable,
+case class AlterTableDropColumnAbortEvent(
+ carbonTable: CarbonTable,
alterTableDropColumnModel: AlterTableDropColumnModel,
sparkSession: SparkSession) extends Event with AlterTableDropColumnEventInfo
@@ -83,7 +90,8 @@ case class AlterTableDropColumnAbortEvent(carbonTable: CarbonTable,
* @param newTablePath
* @param sparkSession
*/
-case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
+case class AlterTableRenamePreEvent(
+ carbonTable: CarbonTable,
alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
@@ -92,7 +100,9 @@ case class AlterTableRenamePreEvent(carbonTable: CarbonTable,
* @param carbonTable
* @param alterTableAddColumnsModel
*/
-case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableAddColumnPreEvent(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
alterTableAddColumnsModel: AlterTableAddColumnsModel)
extends Event with AlterTableAddColumnEventInfo
@@ -101,7 +111,9 @@ case class AlterTableAddColumnPreEvent(sparkSession: SparkSession, carbonTable:
* @param carbonTable
* @param alterTableAddColumnsModel
*/
-case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
+case class AlterTableAddColumnPostEvent(
+ sparkSession: SparkSession,
+ carbonTable: CarbonTable,
alterTableAddColumnsModel: AlterTableAddColumnsModel)
extends Event with AlterTableAddColumnEventInfo
@@ -113,7 +125,8 @@ case class AlterTableAddColumnPostEvent(sparkSession: SparkSession, carbonTable:
* @param newTablePath
* @param sparkSession
*/
-case class AlterTableRenamePostEvent(carbonTable: CarbonTable,
+case class AlterTableRenamePostEvent(
+ carbonTable: CarbonTable,
alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
@@ -125,33 +138,29 @@ case class AlterTableRenamePostEvent(carbonTable: CarbonTable,
* @param newTablePath
* @param sparkSession
*/
-case class AlterTableRenameAbortEvent(carbonTable: CarbonTable,
+case class AlterTableRenameAbortEvent(
+ carbonTable: CarbonTable,
alterTableRenameModel: AlterTableRenameModel, newTablePath: String,
sparkSession: SparkSession) extends Event with AlterTableRenameEventInfo
-/**
- *
- * @param carbonTable
- * @param carbonLoadModel
- * @param mergedLoadName
- * @param sQLContext
- */
-case class AlterTableCompactionPreEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
- carbonLoadModel: CarbonLoadModel,
+case class AlterTableCompactionPreEvent(
+ carbonTable: CarbonTable,
+ carbonMergerMapping: CarbonMergerMapping,
mergedLoadName: String,
- sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
+ sqlContext: SQLContext) extends Event with AlterTableCompactionEventInfo
/**
*
* @param carbonTable
- * @param carbonLoadModel
+ * @param carbonMergerMapping
* @param mergedLoadName
* @param sQLContext
*/
-case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable: CarbonTable,
- carbonLoadModel: CarbonLoadModel,
+case class AlterTableCompactionPostEvent(
+ carbonTable: CarbonTable,
+ carbonMergerMapping: CarbonMergerMapping,
mergedLoadName: String,
sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
@@ -160,11 +169,12 @@ case class AlterTableCompactionPostEvent(sparkSession: SparkSession, carbonTable
* Class for handling clean up in case of any failure and abort the operation
*
* @param carbonTable
- * @param carbonLoadModel
+ * @param carbonMergerMapping
* @param mergedLoadName
* @param sQLContext
*/
-case class AlterTableCompactionAbortEvent(carbonTable: CarbonTable,
- carbonLoadModel: CarbonLoadModel,
+case class AlterTableCompactionAbortEvent(
+ carbonTable: CarbonTable,
+ carbonMergerMapping: CarbonMergerMapping,
mergedLoadName: String,
sQLContext: SQLContext) extends Event with AlterTableCompactionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 6279fca..4af337b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -18,7 +18,7 @@
package org.apache.carbondata.events
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
+import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -95,7 +95,7 @@ trait AlterTableAddColumnEventInfo {
*/
trait AlterTableCompactionEventInfo {
val carbonTable: CarbonTable
- val carbonLoadModel: CarbonLoadModel
+ val carbonMergerMapping: CarbonMergerMapping
val mergedLoadName: String
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
index 12f2922..84dde84 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/LoadEvents.scala
@@ -45,6 +45,14 @@ case class LoadTablePostExecutionEvent(sparkSession: SparkSession,
carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
/**
+ * Class for handling operations after data load completion and before final commit of load
+ * operation. Example usage: For loading pre-aggregate tables
+ */
+case class LoadTablePreStatusUpdateEvent(sparkSession: SparkSession,
+ carbonTableIdentifier: CarbonTableIdentifier,
+ carbonLoadModel: CarbonLoadModel) extends Event with LoadEventInfo
+
+/**
* Class for handling clean up in case of any failure and abort the operation.
*/
case class LoadTableAbortExecutionEvent(sparkSession: SparkSession,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
deleted file mode 100644
index e41211a..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.events._
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Compactor class which handled the compaction cases.
- */
-object Compactor {
-
- val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
-
- def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
-
- val carbonTable = compactionCallableModel.carbonTable
- val loadsToMerge = compactionCallableModel.loadsToMerge
- val sc = compactionCallableModel.sqlContext
- val carbonLoadModel = compactionCallableModel.carbonLoadModel
- val compactionType = compactionCallableModel.compactionType
- val storePath = carbonLoadModel.getTablePath
- val startTime = System.nanoTime()
- val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
- var finalMergeStatus = false
- val databaseName: String = carbonLoadModel.getDatabaseName
- val factTableName = carbonLoadModel.getTableName
- val validSegments: Array[String] = CarbonDataMergerUtil
- .getValidSegments(loadsToMerge).split(',')
- val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
- val carbonMergerMapping = CarbonMergerMapping(storePath,
- carbonTable.getMetaDataFilepath,
- mergedLoadName,
- databaseName,
- factTableName,
- validSegments,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
- compactionType,
- maxSegmentColCardinality = null,
- maxSegmentColumnSchemaList = null
- )
- carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
- carbonLoadModel.setLoadMetadataDetails(
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
- // trigger event for compaction
- val operationContext = new OperationContext
- val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
- AlterTableCompactionPreEvent(compactionCallableModel.sqlContext.sparkSession,
- carbonTable,
- carbonLoadModel,
- mergedLoadName,
- sc)
- OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
-
- var execInstance = "1"
- // in case of non dynamic executor allocation, number of executors are fixed.
- if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
- execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
- logger.info(s"spark.executor.instances property is set to = $execInstance")
- } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
- else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
- if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
- execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
- logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
- }
- }
-
- val mergeStatus =
- if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
- new CarbonIUDMergerRDD(
- sc.sparkContext,
- new MergeResultImpl(),
- carbonLoadModel,
- carbonMergerMapping,
- execInstance
- ).collect
- } else {
- new CarbonMergerRDD(
- sc.sparkContext,
- new MergeResultImpl(),
- carbonLoadModel,
- carbonMergerMapping,
- execInstance
- ).collect
- }
-
- if (mergeStatus.length == 0) {
- finalMergeStatus = false
- } else {
- finalMergeStatus = mergeStatus.forall(_._2)
- }
-
- if (finalMergeStatus) {
- val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
- CommonUtil.mergeIndexFiles(
- sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable, false)
-
- // trigger event for compaction
- val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
- AlterTableCompactionPostEvent(compactionCallableModel.sqlContext.sparkSession,
- carbonTable,
- carbonLoadModel,
- mergedLoadName,
- sc)
- OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
-
- val endTime = System.nanoTime()
- logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
- val statusFileUpdation =
- ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
- CarbonDataMergerUtil
- .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
- carbonTable.getMetaDataFilepath,
- carbonLoadModel)) ||
- CarbonDataMergerUtil
- .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
- mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType)
-
- if (!statusFileUpdation) {
- logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }")
- logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }")
- throw new Exception(s"Compaction failed to update metadata for table" +
- s" ${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }")
- } else {
- logger.audit(s"Compaction request completed for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- logger.info(s"Compaction request completed for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- }
- } else {
- logger.audit(s"Compaction request failed for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
- )
- logger.error(s"Compaction request failed for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- throw new Exception("Compaction Failure in Merger Rdd.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
deleted file mode 100644
index 26a66f6..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.util
-import java.util.concurrent._
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CompactionCallableModel, CompactionModel}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders}
-import org.apache.carbondata.spark.compaction.CompactionCallable
-import org.apache.carbondata.spark.util.CommonUtil
-
-/**
- * Common functions for data life cycle management
- */
-object DataManagementFunc {
-
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- def executeCompaction(carbonLoadModel: CarbonLoadModel,
- compactionModel: CompactionModel,
- executor: ExecutorService,
- sqlContext: SQLContext,
- storeLocation: String): Unit = {
- val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
- carbonLoadModel.getLoadMetadataDetails
- )
- CarbonDataMergerUtil.sortSegments(sortedSegments)
-
- var segList = carbonLoadModel.getLoadMetadataDetails
- var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- carbonLoadModel,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- while (loadsToMerge.size() > 1 ||
- (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
- loadsToMerge.size() > 0)) {
- val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
- deletePartialLoadsInCompaction(carbonLoadModel)
- val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
- CarbonCommonConstants
- .DEFAULT_COLLECTION_SIZE
- )
-
- scanSegmentsAndSubmitJob(futureList,
- loadsToMerge,
- executor,
- sqlContext,
- compactionModel,
- carbonLoadModel
- )
-
- try {
-
- futureList.asScala.foreach(future => {
- future.get
- }
- )
- } catch {
- case e: Exception =>
- LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
- throw e
- }
-
- // scan again and determine if anything is there to merge again.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- segList = carbonLoadModel.getLoadMetadataDetails
- // in case of major compaction we will scan only once and come out as it will keep
- // on doing major for the new loads also.
- // excluding the newly added segments.
- if (CompactionType.MAJOR == compactionModel.compactionType) {
-
- segList = CarbonDataMergerUtil
- .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
- }
-
- if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
- loadsToMerge.clear()
- } else if (segList.size > 0) {
- loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- carbonLoadModel,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- }
- else {
- loadsToMerge.clear()
- }
- }
- }
-
- /**
- * This will submit the loads to be merged into the executor.
- */
- private def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]],
- loadsToMerge: util.List[LoadMetadataDetails],
- executor: ExecutorService,
- sqlContext: SQLContext,
- compactionModel: CompactionModel,
- carbonLoadModel: CarbonLoadModel
- ): Unit = {
- loadsToMerge.asScala.foreach { seg =>
- LOGGER.info("loads identified for merge is " + seg.getLoadName)
- }
-
- val compactionCallableModel = CompactionCallableModel(
- carbonLoadModel,
- compactionModel.carbonTable,
- loadsToMerge,
- sqlContext,
- compactionModel.compactionType
- )
-
- val future: Future[Void] = executor.submit(new CompactionCallable(compactionCallableModel))
- futureList.add(future)
- }
-
- def deletePartialLoadsInCompaction(carbonLoadModel: CarbonLoadModel): Unit = {
- // Deleting the any partially loaded data if present.
- // in some case the segment folder which is present in store will not have entry in
- // status.
- // so deleting those folders.
- try {
- CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
- } catch {
- case e: Exception =>
- LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
- s" ${ e.getMessage }")
- }
- }
-
- private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
- val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
- if (details != null && details.nonEmpty) for (oneRow <- details) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
- SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
- oneRow.getVisibility.equalsIgnoreCase("true")) {
- return true
- }
- }
- false
- }
-
- def deleteLoadsAndUpdateMetadata(
- isForceDeletion: Boolean,
- carbonTable: CarbonTable): Unit = {
- if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
- val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val carbonTableStatusLock =
- CarbonLockFactory.getCarbonLockObj(
- absoluteTableIdentifier,
- LockUsage.TABLE_STATUS_LOCK
- )
-
- // Delete marked loads
- val isUpdationRequired =
- DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
- absoluteTableIdentifier,
- isForceDeletion,
- details
- )
-
- if (isUpdationRequired) {
- try {
- // Update load metadate file after cleaning deleted nodes
- if (carbonTableStatusLock.lockWithRetries()) {
- LOGGER.info("Table status lock has been successfully acquired.")
-
- // read latest table status again.
- val latestMetadata = SegmentStatusManager
- .readLoadMetadata(carbonTable.getMetaDataFilepath)
-
- // update the metadata details from old to new status.
- val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
- CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
- } else {
- val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
- val errorMsg = "Clean files request is failed for " +
- s"$dbName.$tableName" +
- ". Not able to acquire the table status lock due to other operation " +
- "running in the background."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
- }
- } finally {
- CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 74ed6a6..69c9fe4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.load.ValidateUtil
@@ -41,6 +43,8 @@ import org.apache.carbondata.spark.load.ValidateUtil
*/
object DataLoadingUtil {
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
/**
* get data loading options and initialise default value
*/
@@ -54,7 +58,6 @@ object DataLoadingUtil {
optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
optionsFinal.put(
"serialization_null_format",
options.getOrElse("serialization_null_format", "\\N"))
@@ -321,4 +324,70 @@ object DataLoadingUtil {
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
}
}
+
+ private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
+ val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+ if (details != null && details.nonEmpty) for (oneRow <- details) {
+ if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
+ SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
+ oneRow.getVisibility.equalsIgnoreCase("true")) {
+ return true
+ }
+ }
+ false
+ }
+
+ def deleteLoadsAndUpdateMetadata(
+ isForceDeletion: Boolean,
+ carbonTable: CarbonTable): Unit = {
+ if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
+ val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+ val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+ val carbonTableStatusLock =
+ CarbonLockFactory.getCarbonLockObj(
+ absoluteTableIdentifier,
+ LockUsage.TABLE_STATUS_LOCK
+ )
+
+ // Delete marked loads
+ val isUpdationRequired =
+ DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+ absoluteTableIdentifier,
+ isForceDeletion,
+ details
+ )
+
+ if (isUpdationRequired) {
+ try {
+ // Update load metadate file after cleaning deleted nodes
+ if (carbonTableStatusLock.lockWithRetries()) {
+ LOGGER.info("Table status lock has been successfully acquired.")
+
+ // read latest table status again.
+ val latestMetadata = SegmentStatusManager
+ .readLoadMetadata(carbonTable.getMetaDataFilepath)
+
+ // update the metadata details from old to new status.
+ val latestStatus = CarbonLoaderUtil
+ .updateLoadMetadataFromOldToNew(details, latestMetadata)
+
+ CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
+ } else {
+ val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
+ val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
+ val errorMsg = "Clean files request is failed for " +
+ s"$dbName.$tableName" +
+ ". Not able to acquire the table status lock due to other operation " +
+ "running in the background."
+ LOGGER.audit(errorMsg)
+ LOGGER.error(errorMsg)
+ throw new Exception(errorMsg + " Please try after some time.")
+ }
+ } finally {
+ CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
new file mode 100644
index 0000000..636d731
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/AggregateDataMapCompactor.scala
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.rdd
+
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonSession, SQLContext}
+import org.apache.spark.sql.execution.command.CompactionModel
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+
+/**
+ * Used to perform compaction on Aggregate data map.
+ */
+class AggregateDataMapCompactor(carbonLoadModel: CarbonLoadModel,
+ compactionModel: CompactionModel,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ storeLocation: String)
+ extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
+
+ override def executeCompaction(): Unit = {
+ val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val loadMetaDataDetails = identifySegmentsToBeMerged()
+ val segments = loadMetaDataDetails.asScala.map(_.getLoadName)
+ if (segments.nonEmpty) {
+ val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadMetaDataDetails).split("_")(1)
+ CarbonSession.threadSet(
+ CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName,
+ segments.mkString(","))
+ CarbonSession.threadSet(
+ CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName, "false")
+ val headers = carbonTable.getTableInfo.getFactTable.getListOfColumns.asScala
+ .map(_.getColumnName).mkString(",")
+ // Creating a new query string to insert data into pre-aggregate table from that same table.
+ // For example: To compact preaggtable1 we can fire a query like insert into preaggtable1
+ // select * from preaggtable1
+ // The following code will generate the select query with a load UDF that will be used to
+ // apply DataLoadingRules
+ val childDataFrame = sqlContext.sparkSession.sql(new CarbonSpark2SqlParser()
+ // adding the aggregation load UDF
+ .addPreAggLoadFunction(
+ // creating the select query on the bases on table schema
+ PreAggregateUtil.createChildSelectQuery(
+ carbonTable.getTableInfo.getFactTable, carbonTable.getDatabaseName))).drop("preAggLoad")
+ try {
+ CarbonLoadDataCommand(
+ Some(carbonTable.getDatabaseName),
+ carbonTable.getTableName,
+ null,
+ Nil,
+ Map("fileheader" -> headers),
+ isOverwriteTable = false,
+ dataFrame = Some(childDataFrame),
+ internalOptions = Map(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL -> "true",
+ "mergedSegmentName" -> mergedLoadName)).run(sqlContext.sparkSession)
+ val newLoadMetaDataDetails = SegmentStatusManager.readLoadMetadata(
+ carbonTable.getMetaDataFilepath)
+ val updatedLoadMetaDataDetails = newLoadMetaDataDetails collect {
+ case load if loadMetaDataDetails.contains(load) =>
+ load.setMergedLoadName(mergedLoadName)
+ load.setSegmentStatus(SegmentStatus.COMPACTED)
+ load.setModificationOrdeletionTimesStamp(System.currentTimeMillis())
+ load
+ case other => other
+ }
+ val carbonTablePath = CarbonStorePath
+ .getCarbonTablePath(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ .getAbsoluteTableIdentifier)
+ SegmentStatusManager
+ .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath,
+ updatedLoadMetaDataDetails)
+ carbonLoadModel.setLoadMetadataDetails(updatedLoadMetaDataDetails.toList.asJava)
+ } finally {
+ // check if any other segments needs compaction on in case of MINOR_COMPACTION.
+ // For example: after 8.1 creation 0.1, 4.1, 8.1 have to be merged to 0.2 if threshhold
+ // allows it.
+ if (!compactionModel.compactionType.equals(CompactionType.MAJOR)) {
+ executeCompaction()
+ }
+ CarbonSession
+ .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName)
+ CarbonSession.threadUnset(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
+ carbonLoadModel.getDatabaseName + "." +
+ carbonLoadModel.getTableName)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 6393289..1d2934f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -55,7 +55,7 @@ import org.apache.carbondata.core.scan.partition.PartitionUtil
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{LoadTablePostExecutionEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events.{LoadTablePostExecutionEvent, LoadTablePreStatusUpdateEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, StringArrayWritable}
@@ -66,7 +66,7 @@ import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonData
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory, _}
import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, Util}
/**
* This is the factory class which can create different RDD depends on user needs.
@@ -161,18 +161,18 @@ object CarbonDataRDDFactory {
val compactionThread = new Thread {
override def run(): Unit = {
+ val compactor = CompactionFactory.getCompactor(
+ carbonLoadModel,
+ compactionModel,
+ executor,
+ sqlContext,
+ storeLocation)
try {
// compaction status of the table which is triggered by the user.
var triggeredCompactionStatus = false
var exception: Exception = null
try {
- DataManagementFunc.executeCompaction(
- carbonLoadModel,
- compactionModel,
- executor,
- sqlContext,
- storeLocation
- )
+ compactor.executeCompaction()
triggeredCompactionStatus = true
} catch {
case e: Exception =>
@@ -211,10 +211,12 @@ object CarbonDataRDDFactory {
)
// proceed for compaction
try {
- DataManagementFunc.executeCompaction(newCarbonLoadModel,
+ CompactionFactory.getCompactor(
+ newCarbonLoadModel,
newcompactionModel,
- executor, sqlContext, storeLocation
- )
+ executor,
+ sqlContext,
+ storeLocation).executeCompaction()
} catch {
case e: Exception =>
LOGGER.error("Exception in compaction thread for table " +
@@ -248,7 +250,7 @@ object CarbonDataRDDFactory {
}
} finally {
executor.shutdownNow()
- DataManagementFunc.deletePartialLoadsInCompaction(carbonLoadModel)
+ compactor.deletePartialLoadsInCompaction()
compactionLock.unlock()
}
}
@@ -290,7 +292,7 @@ object CarbonDataRDDFactory {
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
// Check if any load need to be deleted before loading new data
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
+ DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
@@ -492,6 +494,10 @@ object CarbonDataRDDFactory {
throw new Exception("No Data to load")
}
writeDictionary(carbonLoadModel, result, writeAll = false)
+ val loadTablePreStatusUpdateEvent = LoadTablePreStatusUpdateEvent(sqlContext.sparkSession,
+ carbonTable.getCarbonTableIdentifier,
+ carbonLoadModel)
+ OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent)
val done = updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
if (!done) {
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
new file mode 100644
index 0000000..3ebc957
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util
+import java.util.concurrent.ExecutorService
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
+
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
+import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * This class is used to perform compaction on carbon table.
+ */
+class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
+ compactionModel: CompactionModel,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ storeLocation: String)
+ extends Compactor(carbonLoadModel, compactionModel, executor, sqlContext, storeLocation) {
+
+ override def executeCompaction(): Unit = {
+ val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+ carbonLoadModel.getLoadMetadataDetails
+ )
+ CarbonDataMergerUtil.sortSegments(sortedSegments)
+
+ var segList = carbonLoadModel.getLoadMetadataDetails
+ var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ carbonLoadModel,
+ compactionModel.compactionSize,
+ segList,
+ compactionModel.compactionType
+ )
+ while (loadsToMerge.size() > 1 ||
+ (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
+ loadsToMerge.size() > 0)) {
+ val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
+ deletePartialLoadsInCompaction()
+
+ try {
+ scanSegmentsAndSubmitJob(loadsToMerge)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e, s"Exception in compaction thread ${ e.getMessage }")
+ throw e
+ }
+
+ // scan again and determine if anything is there to merge again.
+ CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ segList = carbonLoadModel.getLoadMetadataDetails
+ // in case of major compaction we will scan only once and come out as it will keep
+ // on doing major for the new loads also.
+ // excluding the newly added segments.
+ if (CompactionType.MAJOR == compactionModel.compactionType) {
+
+ segList = CarbonDataMergerUtil
+ .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
+ }
+
+ if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType) {
+ loadsToMerge.clear()
+ } else if (segList.size > 0) {
+ loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ carbonLoadModel,
+ compactionModel.compactionSize,
+ segList,
+ compactionModel.compactionType
+ )
+ }
+ else {
+ loadsToMerge.clear()
+ }
+ }
+ }
+
+ /**
+ * This will submit the loads to be merged into the executor.
+ */
+ def scanSegmentsAndSubmitJob(loadsToMerge: util.List[LoadMetadataDetails]): Unit = {
+ loadsToMerge.asScala.foreach { seg =>
+ LOGGER.info("loads identified for merge is " + seg.getLoadName)
+ }
+ val compactionCallableModel = CompactionCallableModel(
+ carbonLoadModel,
+ compactionModel.carbonTable,
+ loadsToMerge,
+ sqlContext,
+ compactionModel.compactionType)
+ triggerCompaction(compactionCallableModel)
+ }
+
+ private def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
+ val carbonTable = compactionCallableModel.carbonTable
+ val loadsToMerge = compactionCallableModel.loadsToMerge
+ val sc = compactionCallableModel.sqlContext
+ val carbonLoadModel = compactionCallableModel.carbonLoadModel
+ val compactionType = compactionCallableModel.compactionType
+ val tablePath = carbonLoadModel.getTablePath
+ val startTime = System.nanoTime()
+ val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
+ var finalMergeStatus = false
+ val databaseName: String = carbonLoadModel.getDatabaseName
+ val factTableName = carbonLoadModel.getTableName
+ val validSegments: Array[String] = CarbonDataMergerUtil
+ .getValidSegments(loadsToMerge).split(',')
+ val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
+ val carbonMergerMapping = CarbonMergerMapping(tablePath,
+ carbonTable.getMetaDataFilepath,
+ mergedLoadName,
+ databaseName,
+ factTableName,
+ validSegments,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ compactionType,
+ maxSegmentColCardinality = null,
+ maxSegmentColumnSchemaList = null
+ )
+ carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
+ carbonLoadModel.setLoadMetadataDetails(
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
+ // trigger event for compaction
+ val operationContext = new OperationContext
+ val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+ AlterTableCompactionPreEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+
+ var execInstance = "1"
+ // in case of non dynamic executor allocation, number of executors are fixed.
+ if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
+ execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
+ LOGGER.info(s"spark.executor.instances property is set to = $execInstance")
+ } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
+ else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
+ if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
+ .equalsIgnoreCase("true")) {
+ execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
+ LOGGER.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
+ }
+ }
+
+ val mergeStatus =
+ if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
+ new CarbonIUDMergerRDD(
+ sc.sparkContext,
+ new MergeResultImpl(),
+ carbonLoadModel,
+ carbonMergerMapping,
+ execInstance
+ ).collect
+ } else {
+ new CarbonMergerRDD(
+ sc.sparkContext,
+ new MergeResultImpl(),
+ carbonLoadModel,
+ carbonMergerMapping,
+ execInstance
+ ).collect
+ }
+
+ if (mergeStatus.length == 0) {
+ finalMergeStatus = false
+ } else {
+ finalMergeStatus = mergeStatus.forall(_._2)
+ }
+
+ if (finalMergeStatus) {
+ val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+ CommonUtil.mergeIndexFiles(
+ sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
+
+ // trigger event for compaction
+ val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+ AlterTableCompactionPostEvent(carbonTable, carbonMergerMapping, mergedLoadName, sc)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+
+ val endTime = System.nanoTime()
+ LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
+ val statusFileUpdation =
+ ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
+ CarbonDataMergerUtil
+ .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
+ carbonTable.getMetaDataFilepath,
+ carbonLoadModel)) ||
+ CarbonDataMergerUtil
+ .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
+ mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType)
+
+ if (!statusFileUpdation) {
+ LOGGER.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ throw new Exception(s"Compaction failed to update metadata for table" +
+ s" ${ carbonLoadModel.getDatabaseName }." +
+ s"${ carbonLoadModel.getTableName }")
+ } else {
+ LOGGER.audit(s"Compaction request completed for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ LOGGER.info(s"Compaction request completed for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ }
+ } else {
+ LOGGER.audit(s"Compaction request failed for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }"
+ )
+ LOGGER.error(s"Compaction request failed for table " +
+ s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+ throw new Exception("Compaction Failure in Merger Rdd.")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
new file mode 100644
index 0000000..6060f06
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CompactionFactory.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util.concurrent.ExecutorService
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.CompactionModel
+
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CompactionFactory {
+
+ /**
+ * Returns appropriate Compactable object.
+ */
+ def getCompactor(carbonLoadModel: CarbonLoadModel,
+ compactionModel: CompactionModel,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ storeLocation: String): Compactor = {
+ if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap) {
+ new AggregateDataMapCompactor(
+ carbonLoadModel,
+ compactionModel,
+ executor,
+ sqlContext,
+ storeLocation)
+ } else {
+ new CarbonTableCompactor(
+ carbonLoadModel,
+ compactionModel,
+ executor,
+ sqlContext,
+ storeLocation)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
new file mode 100644
index 0000000..6fafc95
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import java.util.concurrent.ExecutorService
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.execution.command.CompactionModel
+
+import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+
+abstract class Compactor(carbonLoadModel: CarbonLoadModel,
+ compactionModel: CompactionModel,
+ executor: ExecutorService,
+ sqlContext: SQLContext,
+ storeLocation: String) {
+
+ val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+ def executeCompaction(): Unit
+
+ def identifySegmentsToBeMerged(): java.util.List[LoadMetadataDetails] = {
+ CarbonDataMergerUtil
+ .identifySegmentsToBeMerged(carbonLoadModel,
+ compactionModel.compactionSize,
+ carbonLoadModel.getLoadMetadataDetails,
+ compactionModel.compactionType)
+ }
+
+ def deletePartialLoadsInCompaction(): Unit = {
+ // Deleting the any partially loaded data if present.
+ // in some case the segment folder which is present in store will not have entry in
+ // status.
+ // so deleting those folders.
+ try {
+ CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
+ } catch {
+ case e: Exception =>
+ LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
+ s" ${ e.getMessage }")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 0cb6ca6..a9b5455 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -259,7 +259,7 @@ object CarbonSession {
def initListeners(): Unit = {
OperationListenerBus.getInstance()
.addListener(classOf[DropTablePostEvent], DataMapDropTablePostListener)
- .addListener(classOf[LoadTablePostExecutionEvent], LoadPostAggregateListener)
+ .addListener(classOf[LoadTablePreStatusUpdateEvent], LoadPostAggregateListener)
.addListener(classOf[DeleteSegmentByIdPreEvent], PreAggregateDeleteSegmentByIdPreListener)
.addListener(classOf[DeleteSegmentByDatePreEvent], PreAggregateDeleteSegmentByDatePreListener)
.addListener(classOf[UpdateTablePreEvent], UpdatePreAggregatePreListener)
@@ -271,5 +271,7 @@ object CarbonSession {
.addListener(classOf[AlterTableAddColumnPreEvent], PreAggregateAddColumnsPreListener)
.addListener(classOf[DropDataMapPostEvent], DropDataMapPostListener)
.addListener(classOf[LoadTablePreExecutionEvent], LoadPreAggregateTablePreListener)
+ .addListener(classOf[AlterTableCompactionPostEvent],
+ AlterPreAggregateTableCompactionPostListener)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index e761bea..f642785 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -45,6 +45,7 @@ import org.apache.carbondata.format
import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.exception.NoRetryException
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.merger.CompactionType
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
@@ -128,8 +129,9 @@ case class CarbonLoadDataCommand(
CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser), hadoopConf)
}
carbonLoadModel.setFactFilePath(factPath)
- carbonLoadModel.setAggLoadRequest(internalOptions
- .getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
+ carbonLoadModel.setAggLoadRequest(
+ internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
+ carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
DataLoadingUtil.buildCarbonLoadModel(
table,
carbonProperty,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2304303c/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 90b728d..9168247 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -19,14 +19,18 @@ package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.JavaConverters._
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonLoadDataCommand}
+import org.apache.spark.sql.execution.command.AlterTableModel
import org.apache.spark.sql.CarbonSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.events._
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
object LoadPostAggregateListener extends OperationEventListener {
/**
@@ -35,7 +39,7 @@ object LoadPostAggregateListener extends OperationEventListener {
* @param event
*/
override def onEvent(event: Event, operationContext: OperationContext): Unit = {
- val loadEvent = event.asInstanceOf[LoadTablePostExecutionEvent]
+ val loadEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
val sparkSession = loadEvent.sparkSession
val carbonLoadModel = loadEvent.carbonLoadModel
val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -55,6 +59,36 @@ object LoadPostAggregateListener extends OperationEventListener {
}
}
+/**
+ * Listener to handle the operations that have to be done after compaction for a table has finished.
+ */
+object AlterPreAggregateTableCompactionPostListener extends OperationEventListener {
+ /**
+ * Called on a specified event occurrence
+ *
+ * @param event
+ * @param operationContext
+ */
+ override def onEvent(event: Event, operationContext: OperationContext): Unit = {
+ val compactionEvent = event.asInstanceOf[AlterTableCompactionPostEvent]
+ val carbonTable = compactionEvent.carbonTable
+ val compactionType = compactionEvent.carbonMergerMapping.campactionType
+ val sparkSession = compactionEvent.sQLContext.sparkSession
+ if (carbonTable.hasDataMapSchema) {
+ carbonTable.getTableInfo.getDataMapSchemaList.asScala.foreach { dataMapSchema =>
+ val childRelationIdentifier = dataMapSchema.getRelationIdentifier
+ val alterTableModel = AlterTableModel(Some(childRelationIdentifier.getDatabaseName),
+ childRelationIdentifier.getTableName,
+ None,
+ compactionType.toString,
+ Some(System.currentTimeMillis()),
+ "")
+ CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession)
+ }
+ }
+ }
+}
+
object LoadPreAggregateTablePreListener extends OperationEventListener {
/**
* Called on a specified event occurrence