You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:24 UTC
[17/50] [abbrv] carbondata git commit: [CARBONDATA-2064] Add
compaction listener
[CARBONDATA-2064] Add compaction listener
This closes #1847
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/54a381c2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/54a381c2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/54a381c2
Branch: refs/heads/branch-1.3
Commit: 54a381c27024ece07d400a4a1d36917bd3ca09f9
Parents: 1202e20
Author: dhatchayani <dh...@gmail.com>
Authored: Tue Jan 23 15:26:26 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Feb 1 22:20:33 2018 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 7 -
.../hadoop/api/CarbonOutputCommitter.java | 32 ++--
.../sdv/generated/MergeIndexTestCase.scala | 30 ++--
.../CarbonIndexFileMergeTestCase.scala | 48 +++---
.../dataload/TestGlobalSortDataLoad.scala | 2 +-
.../StandardPartitionTableLoadingTestCase.scala | 5 -
.../carbondata/events/AlterTableEvents.scala | 14 +-
.../spark/rdd/CarbonMergeFilesRDD.scala | 84 ----------
.../carbondata/spark/util/CommonUtil.scala | 51 ------
.../spark/rdd/CarbonDataRDDFactory.scala | 14 --
.../spark/rdd/CarbonTableCompactor.scala | 2 -
.../CarbonAlterTableCompactionCommand.scala | 165 +++++++++----------
.../sql/execution/strategy/DDLStrategy.scala | 17 --
.../CarbonGetTableDetailComandTestCase.scala | 6 +-
.../processing/loading/events/LoadEvents.java | 12 ++
.../processing/merger/CompactionType.java | 1 -
16 files changed, 155 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 77e8db8..7ae3034 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1478,13 +1478,6 @@ public final class CarbonCommonConstants {
public static final String BITSET_PIPE_LINE_DEFAULT = "true";
- /**
- * It is internal configuration and used only for test purpose.
- * It will merge the carbon index files with in the segment to single segment.
- */
- public static final String CARBON_MERGE_INDEX_IN_SEGMENT = "carbon.merge.index.in.segment";
-
- public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler";
/*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 9cca1bb..555ddd2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -25,18 +25,15 @@ import java.util.Set;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.metadata.PartitionMapFileStore;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonSessionInfo;
import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
import org.apache.carbondata.events.OperationContext;
import org.apache.carbondata.events.OperationListenerBus;
import org.apache.carbondata.processing.loading.events.LoadEvents;
@@ -126,7 +123,16 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
}
}
CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
- mergeCarbonIndexFiles(segmentPath);
+ if (operationContext != null) {
+ LoadEvents.LoadTableMergePartitionEvent loadTableMergePartitionEvent =
+ new LoadEvents.LoadTableMergePartitionEvent(segmentPath);
+ try {
+ OperationListenerBus.getInstance()
+ .fireEvent(loadTableMergePartitionEvent, (OperationContext) operationContext);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
String updateTime =
context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
String segmentsToBeDeleted =
@@ -158,24 +164,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
}
/**
- * Merge index files to a new single file.
- */
- private void mergeCarbonIndexFiles(String segmentPath) throws IOException {
- boolean mergeIndex = false;
- try {
- mergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
- } catch (Exception e) {
- mergeIndex = Boolean.parseBoolean(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT);
- }
- if (mergeIndex) {
- new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
- }
- }
-
- /**
* Update the tablestatus as fail if any fail happens.
*
* @param context
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index cb0d02c..8e71257 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
/**
* Test Class for AlterTableTestCase to verify all scenerios
@@ -40,34 +41,30 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
override protected def afterAll(): Unit = {
sql("DROP TABLE IF EXISTS nonindexmerge")
sql("DROP TABLE IF EXISTS indexmerge")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
}
test("Verify correctness of index merge sdv") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql(s"""drop table if exists carbon_automation_nonmerge""").collect
sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_nonmerge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSy
sVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("DROP TABLE IF EXISTS carbon_automation_merge")
sql(s"""create table carbon_automation_merge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADP
artitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
sql("""Select count(*) from carbon_automation_merge"""))
}
test("Verify command of index merge sdv") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql(s"""drop table if exists carbon_automation_nonmerge""").collect
sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -77,17 +74,18 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
val rows = sql("""Select count(*) from carbon_automation_nonmerge""").collect()
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
- sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
}
test("Verify index index merge with compaction sdv") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql(s"""drop table if exists carbon_automation_nonmerge""").collect
sql(s"""create table carbon_automation_nonmerge (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phoneP
ADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string,gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId,Latest_YEAR,Latest_MONTH,Latest_DAY')""").collect
@@ -99,9 +97,11 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index c66107f..895b0b5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -20,12 +20,11 @@ package org.apache.carbondata.spark.testsuite.datacompaction
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
class CarbonIndexFileMergeTestCase
extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -40,13 +39,9 @@ class CarbonIndexFileMergeTestCase
CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
sql("DROP TABLE IF EXISTS nonindexmerge")
sql("DROP TABLE IF EXISTS indexmerge")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
}
test("Verify correctness of index merge") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql("DROP TABLE IF EXISTS nonindexmerge")
sql(
"""
@@ -57,8 +52,6 @@ class CarbonIndexFileMergeTestCase
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("DROP TABLE IF EXISTS indexmerge")
sql(
"""
@@ -68,14 +61,16 @@ class CarbonIndexFileMergeTestCase
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
assert(getIndexFileCount("default_indexmerge", "0") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""),
sql("""Select count(*) from indexmerge"""))
}
test("Verify command of index merge") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql("DROP TABLE IF EXISTS nonindexmerge")
sql(
"""
@@ -90,17 +85,18 @@ class CarbonIndexFileMergeTestCase
val rows = sql("""Select count(*) from nonindexmerge""").collect()
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
- sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
test("Verify command of index merge without enabling property") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql("DROP TABLE IF EXISTS nonindexmerge")
sql(
"""
@@ -115,15 +111,18 @@ class CarbonIndexFileMergeTestCase
val rows = sql("""Select count(*) from nonindexmerge""").collect()
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
- sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
test("Verify index index merge with compaction") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql("DROP TABLE IF EXISTS nonindexmerge")
sql(
"""
@@ -141,16 +140,16 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
test("Verify index index merge for compacted segments") {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql("DROP TABLE IF EXISTS nonindexmerge")
sql(
"""
@@ -172,7 +171,10 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
+ val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
+ new CarbonIndexFileMergeWriter()
+ .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 50a38f1..0d9e0fd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -273,7 +273,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
val segmentDir = carbonTablePath.getSegmentDir("0", "0")
- assertResult(Math.max(4, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+ assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
}
test("Query with small files") {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 31d2598..16f252b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -319,8 +319,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
}
test("merge carbon index disable data loading for partition table for three partition column") {
- CarbonProperties.getInstance.addProperty(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
sql(
"""
| CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp,
@@ -340,9 +338,6 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
val files = carbonFile.listFiles(new CarbonFileFilter {
override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName)
})
- CarbonProperties.getInstance.addProperty(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
assert(files.length == 10)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
index ca1948a..671e132 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/AlterTableEvents.scala
@@ -17,7 +17,7 @@
package org.apache.carbondata.events
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel, CarbonMergerMapping}
+import org.apache.spark.sql.execution.command._
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -203,3 +203,15 @@ case class AlterTableCompactionAbortEvent(sparkSession: SparkSession,
carbonTable: CarbonTable,
carbonMergerMapping: CarbonMergerMapping,
mergedLoadName: String) extends Event with AlterTableCompactionEventInfo
+
+
+/**
+ * Compaction Event for handling exception in compaction
+ *
+ * @param sparkSession
+ * @param carbonTable
+ * @param alterTableModel
+ */
+case class AlterTableCompactionExceptionEvent(sparkSession: SparkSession,
+ carbonTable: CarbonTable,
+ alterTableModel: AlterTableModel) extends Event with AlterTableCompactionEventInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
deleted file mode 100644
index 1087ea7..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
-
-case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentPath: String)
- extends Partition {
-
- override val index: Int = idx
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
- * @param sc
- * @param tablePath
- * @param segments segments to be merged
- */
-class CarbonMergeFilesRDD(
- sc: SparkContext,
- tablePath: String,
- segments: Seq[String],
- readFileFooterFromCarbonDataFile: Boolean)
- extends CarbonRDD[String](sc, Nil) {
-
- override def getPartitions: Array[Partition] = {
- segments.zipWithIndex.map {s =>
- CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
- }.toArray
- }
-
- override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
- val iter = new Iterator[String] {
- val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
- logInfo("Merging carbon index files of segment : " + split.segmentPath)
-
- new CarbonIndexFileMergeWriter()
- .mergeCarbonIndexFilesOfSegment(split.segmentPath, readFileFooterFromCarbonDataFile)
-
- var havePair = false
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished && !havePair) {
- finished = true
- havePair = !finished
- }
- !finished
- }
-
- override def next(): String = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- ""
- }
-
- }
- iter
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index d96a051..b44a0fb 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -55,7 +55,6 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
object CommonUtil {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -891,54 +890,4 @@ object CommonUtil {
(Integer.parseInt(scaleAndPrecision(0).trim), Integer.parseInt(scaleAndPrecision(1).trim))
}
- /**
- * Merge the carbonindex files with in the segment to carbonindexmerge file inside same segment
- *
- * @param sparkContext
- * @param segmentIds
- * @param tablePath
- * @param carbonTable
- * @param mergeIndexProperty
- * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
- * file. This will used in case of upgrade from version
- * which do not store the blocklet info to current version
- */
- def mergeIndexFiles(sparkContext: SparkContext,
- segmentIds: Seq[String],
- tablePath: String,
- carbonTable: CarbonTable,
- mergeIndexProperty: Boolean,
- readFileFooterFromCarbonDataFile: Boolean = false): Unit = {
- if (mergeIndexProperty) {
- new CarbonMergeFilesRDD(
- sparkContext,
- carbonTable.getTablePath,
- segmentIds,
- readFileFooterFromCarbonDataFile).collect()
- } else {
- try {
- CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT).toBoolean
- if (CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
- CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
- new CarbonMergeFilesRDD(
- sparkContext,
- carbonTable.getTablePath,
- segmentIds,
- readFileFooterFromCarbonDataFile).collect()
- }
- } catch {
- case _: Exception =>
- if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
- new CarbonMergeFilesRDD(
- sparkContext,
- carbonTable.getTablePath,
- segmentIds,
- readFileFooterFromCarbonDataFile).collect()
- }
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 3de0e70..5c43d58 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -103,18 +103,6 @@ object CarbonDataRDDFactory {
LOGGER.info(s"Acquired the compaction lock for table ${ carbonLoadModel.getDatabaseName }" +
s".${ carbonLoadModel.getTableName }")
try {
- if (compactionType == CompactionType.SEGMENT_INDEX) {
- // Just launch job to merge index and return
- CommonUtil.mergeIndexFiles(
- sqlContext.sparkContext,
- CarbonDataMergerUtil.getValidSegmentList(
- carbonTable.getAbsoluteTableIdentifier).asScala,
- carbonLoadModel.getTablePath,
- carbonTable,
- true)
- lock.unlock()
- return
- }
startCompactionThreads(
sqlContext,
carbonLoadModel,
@@ -359,8 +347,6 @@ object CarbonDataRDDFactory {
} else {
loadDataFile(sqlContext, carbonLoadModel, hadoopConf)
}
- CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
- Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
if (status.nonEmpty) {
status.foreach { eachLoadStatus =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 8406d8d..bfe4e41 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -221,8 +221,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
if (finalMergeStatus) {
val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
- CommonUtil.mergeIndexFiles(
- sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
new PartitionMapFileStore().mergePartitionMapFiles(
CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber),
carbonLoadModel.getFactTimeStamp + "")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index fb0f9fe..2a77826 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -34,16 +34,17 @@ import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
+import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -90,52 +91,74 @@ case class CarbonAlterTableCompactionCommand(
LogServiceFactory.getLogService(this.getClass.getName)
val tableName = alterTableModel.tableName.toLowerCase
val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
- val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
- if (isLoadInProgress) {
- val message = "Cannot run data loading and compaction on same table concurrently. " +
- "Please wait for load to finish"
- LOGGER.error(message)
- throw new ConcurrentOperationException(message)
- }
- val carbonLoadModel = new CarbonLoadModel()
- carbonLoadModel.setTableName(table.getTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- carbonLoadModel.setTableName(table.getTableName)
- carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTablePath(table.getTablePath)
-
- var storeLocation = CarbonProperties.getInstance.getProperty(
- CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
- System.getProperty("java.io.tmpdir"))
- storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
- // trigger event for compaction
- val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
- AlterTableCompactionPreEvent(sparkSession, table, null, null)
- OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+ operationContext.setProperty("compactionException", "true")
+ var compactionType: CompactionType = null
+ var compactionException = "true"
try {
- alterTableForCompaction(
- sparkSession.sqlContext,
- alterTableModel,
- carbonLoadModel,
- storeLocation,
- operationContext)
+ compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
} catch {
- case e: Exception =>
- if (null != e.getMessage) {
- CarbonException.analysisException(
- s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
- } else {
- CarbonException.analysisException(
- "Exception in compaction. Please check logs for more info.")
- }
+ case _: Exception =>
+ val alterTableCompactionExceptionEvent: AlterTableCompactionExceptionEvent =
+ AlterTableCompactionExceptionEvent(sparkSession, table, alterTableModel)
+ OperationListenerBus.getInstance
+ .fireEvent(alterTableCompactionExceptionEvent, operationContext)
+ compactionException = operationContext.getProperty("compactionException").toString
+ }
+
+ if (compactionException.equalsIgnoreCase("true") && null == compactionType) {
+ throw new MalformedCarbonCommandException(
+ "Unsupported alter operation on carbon table")
+ } else if (compactionException.equalsIgnoreCase("false")) {
+ Seq.empty
+ } else {
+ val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
+ if (isLoadInProgress) {
+ val message = "Cannot run data loading and compaction on same table concurrently. " +
+ "Please wait for load to finish"
+ LOGGER.error(message)
+ throw new ConcurrentOperationException(message)
+ }
+
+ val carbonLoadModel = new CarbonLoadModel()
+ carbonLoadModel.setTableName(table.getTableName)
+ val dataLoadSchema = new CarbonDataLoadSchema(table)
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+ carbonLoadModel.setTableName(table.getTableName)
+ carbonLoadModel.setDatabaseName(table.getDatabaseName)
+ carbonLoadModel.setTablePath(table.getTablePath)
+
+ var storeLocation = CarbonProperties.getInstance.getProperty(
+ CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
+ System.getProperty("java.io.tmpdir"))
+ storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+ // trigger event for compaction
+ val alterTableCompactionPreEvent: AlterTableCompactionPreEvent =
+ AlterTableCompactionPreEvent(sparkSession, table, null, null)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPreEvent, operationContext)
+ try {
+ alterTableForCompaction(
+ sparkSession.sqlContext,
+ alterTableModel,
+ carbonLoadModel,
+ storeLocation,
+ operationContext)
+ } catch {
+ case e: Exception =>
+ if (null != e.getMessage) {
+ CarbonException.analysisException(
+ s"Compaction failed. Please check logs for more info. ${ e.getMessage }")
+ } else {
+ CarbonException.analysisException(
+ "Exception in compaction. Please check logs for more info.")
+ }
+ }
+ // trigger event for compaction
+ val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
+ AlterTableCompactionPostEvent(sparkSession, table, null, null)
+ OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
+ Seq.empty
}
- // trigger event for compaction
- val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =
- AlterTableCompactionPostEvent(sparkSession, table, null, null)
- OperationListenerBus.getInstance.fireEvent(alterTableCompactionPostEvent, operationContext)
- Seq.empty
}
private def alterTableForCompaction(sqlContext: SQLContext,
@@ -225,50 +248,14 @@ case class CarbonAlterTableCompactionCommand(
LOGGER.info("Acquired the compaction lock for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
try {
- if (compactionType == CompactionType.SEGMENT_INDEX) {
- // Just launch job to merge index and return
- CommonUtil.mergeIndexFiles(
- sqlContext.sparkContext,
- CarbonDataMergerUtil.getValidSegmentList(
- carbonTable.getAbsoluteTableIdentifier).asScala,
- carbonLoadModel.getTablePath,
- carbonTable,
- mergeIndexProperty = true,
- readFileFooterFromCarbonDataFile = true)
-
- val carbonMergerMapping = CarbonMergerMapping(carbonTable.getTablePath,
- carbonTable.getMetaDataFilepath,
- "",
- carbonTable.getDatabaseName,
- carbonTable.getTableName,
- Array(),
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
- compactionType,
- maxSegmentColCardinality = null,
- maxSegmentColumnSchemaList = null,
- compactionModel.currentPartitions,
- null)
-
- // trigger event for compaction
- val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
- AlterTableCompactionPreStatusUpdateEvent(sqlContext.sparkSession,
- carbonTable,
- carbonMergerMapping,
- carbonLoadModel,
- "")
- OperationListenerBus.getInstance
- .fireEvent(alterTableCompactionPreStatusUpdateEvent, operationContext)
- lock.unlock()
- } else {
- CarbonDataRDDFactory.startCompactionThreads(
- sqlContext,
- carbonLoadModel,
- storeLocation,
- compactionModel,
- lock,
- operationContext
- )
- }
+ CarbonDataRDDFactory.startCompactionThreads(
+ sqlContext,
+ carbonLoadModel,
+ storeLocation,
+ compactionModel,
+ lock,
+ operationContext
+ )
} catch {
case e: Exception =>
LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index b174b94..83831e3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -100,24 +100,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
.tableExists(TableIdentifier(altertablemodel.tableName,
altertablemodel.dbName))(sparkSession)
if (isCarbonTable) {
- var compactionType: CompactionType = null
- try {
- compactionType = CompactionType.valueOf(altertablemodel.compactionType.toUpperCase)
- } catch {
- case _: Exception =>
- throw new MalformedCarbonCommandException(
- "Unsupported alter operation on carbon table")
- }
- if (CompactionType.MINOR == compactionType ||
- CompactionType.MAJOR == compactionType ||
- CompactionType.SEGMENT_INDEX == compactionType ||
- CompactionType.STREAMING == compactionType ||
- CompactionType.CLOSE_STREAMING == compactionType) {
ExecutedCommandExec(alterTable) :: Nil
- } else {
- throw new MalformedCarbonCommandException(
- "Unsupported alter operation on carbon table")
- }
} else {
throw new MalformedCarbonCommandException(
"Operation not allowed : " + altertablemodel.alterSql)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 6265d0d..48733dc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -42,10 +42,10 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
assertResult(2)(result.length)
assertResult("table_info1")(result(0).getString(0))
- // 2143 is the size of carbon table
- assertResult(2143)(result(0).getLong(1))
+ // 2096 is the size of carbon table
+ assertResult(2096)(result(0).getLong(1))
assertResult("table_info2")(result(1).getString(0))
- assertResult(2143)(result(1).getLong(1))
+ assertResult(2096)(result(1).getLong(1))
}
override def afterAll: Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
index 190c72c..a3fa292 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/events/LoadEvents.java
@@ -181,4 +181,16 @@ public class LoadEvents {
return carbonLoadModel;
}
}
+
+ public static class LoadTableMergePartitionEvent extends Event {
+ private String segmentPath;
+
+ public LoadTableMergePartitionEvent(String segmentPath) {
+ this.segmentPath = segmentPath;
+ }
+
+ public String getSegmentPath() {
+ return segmentPath;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/54a381c2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 39f56a2..9ed87fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -27,7 +27,6 @@ public enum CompactionType {
MAJOR,
IUD_UPDDEL_DELTA,
IUD_DELETE_DELTA,
- SEGMENT_INDEX,
STREAMING,
CLOSE_STREAMING,
NONE