You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/16 02:56:25 UTC
[2/3] carbondata git commit: [CARBONDATA-2428] Support flat folder
for managed carbon table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 99b536c..b4937e6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,12 +17,15 @@
package org.apache.carbondata.spark.testsuite.datacompaction
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
@@ -61,7 +64,7 @@ class CarbonIndexFileMergeTestCase
""".stripMargin)
sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
s"'GLOBAL_SORT_PARTITIONS'='100')")
- val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "indexmerge")
new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
assert(getIndexFileCount("default_indexmerge", "0") == 0)
@@ -84,7 +87,7 @@ class CarbonIndexFileMergeTestCase
val rows = sql("""Select count(*) from nonindexmerge""").collect()
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
- val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
new CarbonIndexFileMergeWriter(table)
@@ -109,7 +112,7 @@ class CarbonIndexFileMergeTestCase
val rows = sql("""Select count(*) from nonindexmerge""").collect()
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
- val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
new CarbonIndexFileMergeWriter(table)
@@ -138,7 +141,7 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
@@ -167,7 +170,7 @@ class CarbonIndexFileMergeTestCase
assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
- val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "nonindexmerge")
new CarbonIndexFileMergeWriter(table)
.mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
@@ -190,18 +193,32 @@ class CarbonIndexFileMergeTestCase
sql("select * from mitable").show()
}
- private def getIndexFileCount(tableName: String, segment: String): Int = {
- val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
- val path = CarbonTablePath
- .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
- val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
- override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
- .INDEX_FILE_EXT)
- })
- if (carbonFiles != null) {
- carbonFiles.length
+ private def getIndexFileCount(tableName: String, segmentNo: String): Int = {
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+ if (FileFactory.isFileExist(segmentDir)) {
+ val indexFiles = new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir)
+ indexFiles.asScala.map { f =>
+ if (f._2 == null) {
+ 1
+ } else {
+ 0
+ }
+ }.sum
} else {
- 0
+ val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+ if (segment != null) {
+ val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+ store.getSegmentFile.getLocationMap.values().asScala.map { f =>
+ if (f.getMergeFileName == null) {
+ f.getFiles.size()
+ } else {
+ 0
+ }
+ }.sum
+ } else {
+ 0
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
index 8f891ce..d49b962 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -22,9 +22,12 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val filePath: String = s"$resourcesPath/globalsort"
@@ -527,8 +530,12 @@ class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAf
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
- val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
- segmentNo
- new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+ if (FileFactory.isFileExist(segmentDir)) {
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+ } else {
+ val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+ new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
index 2da1ada..54c19c2 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -23,8 +23,11 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.util.path.CarbonTablePath
class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
val filePath: String = s"$resourcesPath/globalsort"
@@ -567,8 +570,12 @@ class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndA
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
- val store = carbonTable.getAbsoluteTableIdentifier.getTablePath + "/Fact/Part0/Segment_" +
- segmentNo
- new SegmentIndexFileStore().getIndexFilesFromSegment(store).size()
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+ if (FileFactory.isFileExist(segmentDir)) {
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+ } else {
+ val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+ new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index f3e12d1..c695b05 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -26,8 +26,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -189,12 +191,14 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
}
def getIndexfileCount(tableName: String, segmentNo: String = "0"): Int = {
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
- CarbonCommonConstants.DATABASE_DEFAULT_NAME,
- tableName
- )
- val segmentDir = carbonTable.getSegmentPath(segmentNo)
- new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
+ if (FileFactory.isFileExist(segmentDir)) {
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+ } else {
+ val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+ new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+ }
}
override def afterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index b9d8e12..39785a3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.testsuite.dataload
+import scala.collection.JavaConverters._
+
import java.io.{File, FilenameFilter}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -26,7 +28,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
var originVersion = ""
@@ -49,12 +53,20 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
val indexReader = new CarbonIndexFileReader()
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
- val carbonIndexPaths = new File(segmentDir)
- .listFiles(new FilenameFilter {
- override def accept(dir: File, name: String): Boolean = {
- name.endsWith(CarbonTablePath.getCarbonIndexExtension)
- }
- })
+
+ val carbonIndexPaths = if (FileFactory.isFileExist(segmentDir)) {
+ new File(segmentDir)
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.endsWith(CarbonTablePath.getCarbonIndexExtension)
+ }
+ })
+ } else {
+ val segment = Segment.getSegment("0", carbonTable.getTablePath)
+ val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+ store.readIndexFiles()
+ store.getIndexCarbonFiles.asScala.map(f => new File(f.getAbsolutePath)).toArray
+ }
for (carbonIndexPath <- carbonIndexPaths) {
indexReader.openThriftReader(carbonIndexPath.getCanonicalPath)
assert(indexReader.readIndexHeader().getVersion === 3)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index bba75ad..d7b1172 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.testsuite.dataload
+import scala.collection.JavaConverters._
+
import java.io.{File, FileWriter}
import org.apache.commons.io.FileUtils
@@ -31,8 +33,10 @@ import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.spark.rdd.CarbonScanRDD
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -273,7 +277,15 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, "0")
- assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+ if (FileFactory.isFileExist(segmentDir)) {
+ assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
+ } else {
+ val segment = Segment.getSegment("0", carbonTable.getTablePath)
+ val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+ store.readIndexFiles()
+ val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
+ assertResult(Math.max(7, defaultParallelism) + 1)(size + store.getIndexFilesMap.size())
+ }
}
test("Query with small files") {
@@ -379,6 +391,11 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNo)
- new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+ if (FileFactory.isFileExist(segmentDir)) {
+ new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
+ } else {
+ val segment = Segment.getSegment(segmentNo, carbonTable.getTablePath)
+ new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName).getIndexCarbonFiles.size()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index b5c3df1..074c807 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -64,8 +64,9 @@ class CGDataMapFactory(
* Get the datamap for segmentid
*/
override def getDataMaps(segment: Segment): java.util.List[CoarseGrainDataMap] = {
- val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)
- val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+ val path = identifier.getTablePath
+ val file = FileFactory.getCarbonFile(
+ path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
val files = file.listFiles()
files.map {f =>
@@ -100,8 +101,9 @@ class CGDataMapFactory(
* @return
*/
override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
- val path = CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo)
- val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+ val path = identifier.getTablePath
+ val file = FileFactory.getCarbonFile(
+ path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
val files = file.listFiles()
files.map { f =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index 2d666c3..08d8911 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -89,8 +89,9 @@ class FGDataMapFactory(carbonTable: CarbonTable,
* @return
*/
override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
- val path = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segment.getSegmentNo)
- val file = FileFactory.getCarbonFile(path+ "/" +dataMapSchema.getDataMapName)
+ val path = carbonTable.getTablePath
+ val file = FileFactory.getCarbonFile(
+ path+ "/" +dataMapSchema.getDataMapName + "/" + segment.getSegmentNo)
val files = file.listFiles()
files.map { f =>
@@ -416,7 +417,6 @@ class FGDataMapWriter(carbonTable: CarbonTable,
stream.write(bytes)
stream.writeInt(bytes.length)
stream.close()
-// commitFile(fgwritepath)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index eaa2ae7..642607c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.testsuite.datamap
+import scala.collection.JavaConverters._
+
import java.io.{File, FilenameFilter}
import org.apache.spark.sql.Row
@@ -26,7 +28,9 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -261,12 +265,21 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
| group by name
""".stripMargin)
assertResult(true)(new File(path).exists())
- assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
- .list(new FilenameFilter {
- override def accept(dir: File, name: String): Boolean = {
- name.contains(CarbonCommonConstants.FACT_FILE_EXT)
- }
- }).length > 0)
+ if (FileFactory.isFileExist(CarbonTablePath.getSegmentPath(path, "0"))) {
+ assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+ .list(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+ }
+ }).length > 0)
+ } else {
+ val segment = Segment.getSegment("0", path)
+ val store = new SegmentFileStore(path, segment.getSegmentFileName)
+ store.readIndexFiles()
+ val size = store.getIndexFilesMap.asScala.map(f => f._2.size()).sum
+ assertResult(true)(size > 0)
+ }
+
checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
sql("drop datamap preagg on table main")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
new file mode 100644
index 0000000..d786d10
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.flatfolder
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
+ override def beforeAll {
+ dropTable
+
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ sql(
+ """
+ | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ }
+
+ def validateDataFiles(tableUniqueName: String, segmentId: String): Unit = {
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ val files = FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles()
+ assert(files.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)))
+ }
+
+ test("data loading for flat folder with global sort") {
+ sql(
+ """
+ | CREATE TABLE flatfolder_gs (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int,empno int)
+ | STORED BY 'org.apache.carbondata.format' tblproperties('sort_scope'='global_sort', 'flat_folder'='true')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_gs OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_flatfolder_gs", "0")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder_gs order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("data loading for flat folder") {
+ sql(
+ """
+ | CREATE TABLE flatfolder (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int,empno int)
+ | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_flatfolder", "0")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from flatfolder order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("data loading for flat folder pre-agg") {
+ sql(
+ """
+ | CREATE TABLE flatfolder_preagg (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int,empno int)
+ | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+ """.stripMargin)
+ sql("create datamap p2 on table flatfolder_preagg using 'preaggregate' as select empname, designation, min(salary) from flatfolder_preagg group by empname, designation ")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_preagg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ validateDataFiles("default_flatfolder_preagg", "0")
+ validateDataFiles("default_flatfolder_preagg_p2", "0")
+
+ checkAnswer(sql("select empname, designation, min(salary) from flatfolder_preagg group by empname, designation"),
+ sql("select empname, designation, min(salary) from originTable group by empname, designation"))
+
+ }
+
+ override def afterAll = {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TASK_DISTRIBUTION ,
+ CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_DEFAULT)
+ dropTable
+ }
+
+ def dropTable = {
+ sql("drop table if exists originTable")
+ sql("drop table if exists flatfolder")
+ sql("drop table if exists flatfolder_gs")
+ sql("drop table if exists flatfolder_preagg")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index ec39f66..2432715 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -26,6 +26,9 @@ import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOp
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
@@ -689,7 +692,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.FILE_SEPARATOR + "t" +
CarbonCommonConstants.FILE_SEPARATOR + "Fact" +
CarbonCommonConstants.FILE_SEPARATOR + "Part0")
- assert(f.list().length == 2)
+ if (!FileFactory.isFileExist(
+ CarbonTablePath.getSegmentFilesLocation(
+ dblocation + CarbonCommonConstants.FILE_SEPARATOR +
+ CarbonCommonConstants.FILE_SEPARATOR + "t"))) {
+ assert(f.list().length == 2)
+ }
}
test("test sentences func in update statement") {
sql("drop table if exists senten")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index 0eaaec5..133454a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -16,17 +16,22 @@
*/
package org.apache.carbondata.spark.testsuite.partition
+import scala.collection.JavaConverters._
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.datamap.Segment
+
class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
@@ -62,12 +67,20 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Seq[Int]): Unit = {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
val segmentDir = carbonTable.getSegmentPath(segmentId)
- val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
- val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
- override def accept(file: CarbonFile): Boolean = {
- return file.getName.endsWith(".carbondata")
- }
- })
+
+ val dataFiles = if (FileFactory.isFileExist(segmentDir)) {
+ val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+ carbonFile.listFiles(new CarbonFileFilter() {
+ override def accept(file: CarbonFile): Boolean = {
+ return file.getName.endsWith(".carbondata")
+ }
+ })
+ } else {
+ val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
+ val store = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+ store.readIndexFiles()
+ store.getIndexFilesMap.asScala.flatMap(_._2.asScala).map(f => FileFactory.getCarbonFile(f)).toArray
+ }
assert(dataFiles.size == partitions.size)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0422239..f443214 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.processing.sort.sortdata.SortParameters
import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
import org.apache.carbondata.processing.util.{CarbonBadRecordUtil, CarbonDataProcessorUtil}
import org.apache.carbondata.spark.rdd.{NewRddIterator, StringArrayRow}
-import org.apache.carbondata.spark.util.Util
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
object DataLoadProcessorStepOnSpark {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 2ba6e5e..3aaf0ae 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{Partition, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.execution.command.CarbonMergerMapping
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -74,7 +75,8 @@ class CarbonIUDMergerRDD[K, V](
val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
// group blocks by segment.
- val splitsGroupedMySegment = carbonInputSplits.groupBy(_.getSegmentId)
+ val splitsGroupedMySegment =
+ carbonInputSplits.groupBy(_.getSegmentId)
var i = -1
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index d29284f..2fca57e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block._
import org.apache.carbondata.core.indexstore.PartitionSpec
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
@@ -133,7 +134,7 @@ class CarbonMergerRDD[K, V](
.toList
}
mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.campactionType) {
- tableBlockInfoList.get(0).getSegmentId
+ tableBlockInfoList.get(0).getSegment.toString
} else {
mergedLoadName.substring(
mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
@@ -326,7 +327,9 @@ class CarbonMergerRDD[K, V](
val blockInfo = new TableBlockInfo(entry.getPath.toString,
entry.getStart, entry.getSegmentId,
entry.getLocations, entry.getLength, entry.getVersion,
- updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString, entry.getSegmentId)
+ updateStatusManager.getDeleteDeltaFilePath(
+ entry.getPath.toString,
+ Segment.toSegment(entry.getSegmentId).getSegmentNo)
)
(!updated || (updated && (!CarbonUtil
.isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 77ff139..3995aa7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -530,6 +530,23 @@ object CommonUtil {
}
/**
+ * This method will validate the flat folder property specified by the user
+ *
+ * @param tableProperties
+ */
+ def validateFlatFolder(tableProperties: Map[String, String]): Unit = {
+ val tblPropName = CarbonCommonConstants.FLAT_FOLDER
+ if (tableProperties.get(tblPropName).isDefined) {
+ val trimStr = tableProperties(tblPropName).trim
+ if (!trimStr.equalsIgnoreCase("true") && !trimStr.equalsIgnoreCase("false")) {
+ throw new MalformedCarbonCommandException(s"Invalid $tblPropName value found: " +
+ s"$trimStr, only true|false is supported.")
+ }
+ tableProperties.put(tblPropName, trimStr)
+ }
+ }
+
+ /**
* This method will validate the compaction level threshold property specified by the user
* the property is used while doing minor compaction
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 61a5b42..7d28790 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -301,6 +301,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
CommonUtil.validateTableBlockSize(tableProperties)
// validate table level properties for compaction
CommonUtil.validateTableLevelCompactionProperties(tableProperties)
+ // validate flat folder property.
+ CommonUtil.validateFlatFolder(tableProperties)
TableModel(
ifNotExistPresent,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 84d9c47..fdbf400 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -25,18 +25,23 @@ import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.execution.command.{AlterPartitionModel, DataMapField, Field, PartitionerField}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.PartitionInfo
import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -192,9 +197,13 @@ object PartitionUtils {
val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
- val indexFilePath = CarbonTablePath.getCarbonIndexFilePath(
- tablePath, String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
- timestamp, version)
+ val indexFilePath =
+ new Path(new Path(path).getParent,
+ CarbonTablePath.getCarbonIndexFileName(taskId,
+ bucketNumber.toInt,
+ batchNo,
+ timestamp,
+ segmentId)).toString
// indexFilePath could be duplicated when multiple data file related to one index file
if (indexFilePath != null && !pathList.contains(indexFilePath)) {
pathList.add(indexFilePath)
@@ -209,11 +218,13 @@ object PartitionUtils {
CarbonUtil.deleteFiles(files.asScala.toArray)
if (!files.isEmpty) {
val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val file = SegmentFileStore.writeSegmentFile(
- identifier.getTablePath,
- alterPartitionModel.segmentId,
- alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
- val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file, null))
+ val updatedSegFile: String = mergeAndUpdateSegmentFile(alterPartitionModel,
+ identifier,
+ segmentId,
+ carbonTable,
+ files.asScala)
+
+ val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, updatedSegFile, null))
.asJava
if (!CarbonUpdateUtil.updateTableMetadataStatus(
new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId,
@@ -283,4 +294,50 @@ object PartitionUtils {
generatePartitionerField(allPartitionColumn.toList, Seq.empty)
}
+
+ private def mergeAndUpdateSegmentFile(alterPartitionModel: AlterPartitionModel,
+ identifier: AbsoluteTableIdentifier,
+ segmentId: String,
+ carbonTable: CarbonTable, filesToBeDelete: Seq[File]) = {
+ val metadataDetails =
+ SegmentStatusManager.readTableStatusFile(
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+ val segmentFile =
+ metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
+ var allSegmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+ val file = SegmentFileStore.writeSegmentFile(
+ carbonTable,
+ alterPartitionModel.segmentId,
+ System.currentTimeMillis().toString)
+ if (segmentFile != null) {
+ allSegmentFiles ++= FileFactory.getCarbonFile(
+ SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
+ }
+ val updatedSegFile = {
+ val carbonFile = FileFactory.getCarbonFile(
+ SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
+ allSegmentFiles ++= carbonFile :: Nil
+
+ val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+ segmentId,
+ alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
+ val tmpFile = mergedSegFileName + "_tmp"
+ val segmentStoreFile = SegmentFileStore.mergeSegmentFiles(
+ tmpFile,
+ CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+ allSegmentFiles.toArray)
+ val indexFiles = segmentStoreFile.getLocationMap.values().asScala.head.getFiles
+ filesToBeDelete.foreach(f => indexFiles.remove(f.getName))
+ SegmentFileStore.writeSegmentFile(
+ segmentStoreFile,
+ CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR + mergedSegFileName + CarbonTablePath.SEGMENT_EXT)
+ carbonFile.delete()
+ FileFactory.getCarbonFile(
+ SegmentFileStore.getSegmentFilePath(
+ carbonTable.getTablePath, tmpFile + CarbonTablePath.SEGMENT_EXT)).delete()
+ mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+ }
+ updatedSegFile
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
index 5902783..f3f2650 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/datamap/IndexDataMapRebuildRDD.scala
@@ -84,9 +84,7 @@ object IndexDataMapRebuildRDD {
segmentId: String): Unit = {
val dataMapStorePath =
- CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) +
- File.separator +
- dataMapName
+ CarbonTablePath.getDataMapStorePath(carbonTable.getTablePath, segmentId, dataMapName)
if (!FileFactory.isFileExist(dataMapStorePath)) {
if (FileFactory.mkdirs(dataMapStorePath, FileFactory.getFileType(dataMapStorePath))) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 21a8641..5d53ccc 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,6 +50,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datamap.status.DataMapStatusManager
import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
@@ -434,13 +436,7 @@ object CarbonDataRDDFactory {
segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
}
}
- val segmentFiles = segmentDetails.asScala.map{seg =>
- val file = SegmentFileStore.writeSegmentFile(
- carbonTable.getTablePath,
- seg.getSegmentNo,
- updateModel.get.updatedTimeStamp.toString)
- new Segment(seg.getSegmentNo, file)
- }.filter(_.getSegmentFileName != null).asJava
+ val segmentFiles = updateSegmentFiles(carbonTable, segmentDetails, updateModel.get)
// this means that the update doesnt have any records to update so no need to do table
// status file updation.
@@ -517,9 +513,13 @@ object CarbonDataRDDFactory {
writeDictionary(carbonLoadModel, result, writeAll = false)
val segmentFileName =
- SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId,
+ SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
String.valueOf(carbonLoadModel.getFactTimeStamp))
+ SegmentFileStore.updateSegmentFile(
+ carbonTable.getTablePath,
+ carbonLoadModel.getSegmentId,
+ segmentFileName)
operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
carbonLoadModel.getSegmentId)
val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -588,6 +588,58 @@ object CarbonDataRDDFactory {
}
/**
+ * Add and update the segment files. In case of update scenario the carbonindex files are written
+ * to the same segment so we need to update old segment file. So this ethod writes the latest data
+ * to new segment file and merges this file old file to get latest updated files.
+ * @param carbonTable
+ * @param segmentDetails
+ * @return
+ */
+ private def updateSegmentFiles(
+ carbonTable: CarbonTable,
+ segmentDetails: util.HashSet[Segment],
+ updateModel: UpdateTableModel) = {
+ val metadataDetails =
+ SegmentStatusManager.readTableStatusFile(
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+ val segmentFiles = segmentDetails.asScala.map { seg =>
+ val segmentFile =
+ metadataDetails.find(_.getLoadName.equals(seg.getSegmentNo)).get.getSegmentFile
+ var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
+
+ val file = SegmentFileStore.writeSegmentFile(
+ carbonTable,
+ seg.getSegmentNo,
+ String.valueOf(System.currentTimeMillis()))
+
+ if (segmentFile != null) {
+ segmentFiles ++= FileFactory.getCarbonFile(
+ SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
+ }
+ val updatedSegFile = if (file != null) {
+ val carbonFile = FileFactory.getCarbonFile(
+ SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, file))
+ segmentFiles ++= carbonFile :: Nil
+
+ val mergedSegFileName = SegmentFileStore.genSegmentFileName(
+ seg.getSegmentNo,
+ updateModel.updatedTimeStamp.toString)
+ SegmentFileStore.mergeSegmentFiles(
+ mergedSegFileName,
+ CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
+ segmentFiles.toArray)
+ carbonFile.delete()
+ mergedSegFileName + CarbonTablePath.SEGMENT_EXT
+ } else {
+ null
+ }
+
+ new Segment(seg.getSegmentNo, updatedSegFile)
+ }.filter(_.getSegmentFileName != null).asJava
+ segmentFiles
+ }
+
+ /**
* If data load is triggered by UPDATE query, this func will execute the update
* TODO: move it to a separate update command
*/
@@ -614,10 +666,11 @@ object CarbonDataRDDFactory {
carbonTable.getMetadataPath)
.filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
- val segmentIds = loadMetadataDetails.map(_.getLoadName)
- val segmentIdIndex = segmentIds.zipWithIndex.toMap
- val segmentId2maxTaskNo = segmentIds.map { segId =>
- (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonLoadModel.getTablePath))
+ val segments = loadMetadataDetails.map(f => new Segment(f.getLoadName, f.getSegmentFile))
+ val segmentIdIndex = segments.map(_.getSegmentNo).zipWithIndex.toMap
+ val segmentId2maxTaskNo = segments.map { seg =>
+ (seg.getSegmentNo,
+ CarbonUpdateUtil.getLatestTaskIdForSegment(seg, carbonLoadModel.getTablePath))
}.toMap
class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
@@ -639,10 +692,14 @@ object CarbonDataRDDFactory {
val partitionId = TaskContext.getPartitionId()
val segIdIndex = partitionId / segmentUpdateParallelism
val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
- val segId = segmentIds(segIdIndex)
- val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
- List(triggerDataLoadForSegment(carbonLoadModel, updateModel, segId, newTaskNo, partition)
- .toList).toIterator
+ val segId = segments(segIdIndex)
+ val newTaskNo = segmentId2maxTaskNo(segId.getSegmentNo) + randomPart + 1
+ List(triggerDataLoadForSegment(
+ carbonLoadModel,
+ updateModel,
+ segId.getSegmentNo,
+ newTaskNo,
+ partition).toList).toIterator
}.collect()
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 155bdd1..7605b9d 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -223,7 +223,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
val segmentFilesList = loadsToMerge.asScala.map{seg =>
val file = SegmentFileStore.writeSegmentFile(
- carbonTable.getTablePath,
+ carbonTable,
seg.getLoadName,
carbonLoadModel.getFactTimeStamp.toString)
new Segment(seg.getLoadName, file)
@@ -231,7 +231,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
} else {
segmentFileName = SegmentFileStore.writeSegmentFile(
- carbonTable.getTablePath,
+ carbonTable,
mergedLoadNumber,
carbonLoadModel.getFactTimeStamp.toString)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
index 93c0b4a..30cb464 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSession.scala
@@ -99,7 +99,7 @@ class CarbonSession(@transient val sc: SparkContext,
trySearchMode(qe, sse)
} catch {
case e: Exception =>
- logError(String.format(
+ log.error(String.format(
"Exception when executing search mode: %s", e.getMessage))
throw e;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 0c6d2ba..127e1b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -68,12 +68,7 @@ object DeleteExecution {
val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val isPartitionTable = carbonTable.isHivePartitionTable
- val factPath = if (isPartitionTable) {
- absoluteTableIdentifier.getTablePath
- } else {
- CarbonTablePath.getFactDir(absoluteTableIdentifier.getTablePath)
- }
+ val tablePath = absoluteTableIdentifier.getTablePath
var segmentsTobeDeleted = Seq.empty[Segment]
val deleteRdd = if (isUpdateOperation) {
@@ -114,6 +109,9 @@ object DeleteExecution {
CarbonUpdateUtil
.createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
+ val metadataDetails = SegmentStatusManager.readTableStatusFile(
+ CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
+
val rowContRdd =
sparkSession.sparkContext.parallelize(
blockMappingVO.getCompleteBlockRowDetailVO.asScala.toSeq,
@@ -127,12 +125,16 @@ object DeleteExecution {
var result = List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]()
while (records.hasNext) {
val ((key), (rowCountDetailsVO, groupedRows)) = records.next
+ val segmentId = key.substring(0, key.indexOf(CarbonCommonConstants.FILE_SEPARATOR))
+ val segmentFile =
+ metadataDetails.find(_.getLoadName.equals(segmentId)).get.getSegmentFile
result = result ++
deleteDeltaFunc(index,
key,
groupedRows.toIterator,
timestamp,
- rowCountDetailsVO)
+ rowCountDetailsVO,
+ segmentFile)
}
result
}
@@ -219,7 +221,8 @@ object DeleteExecution {
key: String,
iter: Iterator[Row],
timestamp: String,
- rowCountDetailsVO: RowCountDetailsVO
+ rowCountDetailsVO: RowCountDetailsVO,
+ segmentFile: String
): Iterator[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))] = {
val result = new DeleteDelataResultImpl()
@@ -255,7 +258,7 @@ object DeleteExecution {
countOfRows = countOfRows + 1
}
- val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable)
+ val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, tablePath, segmentFile != null)
val completeBlockName = CarbonTablePath
.addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
CarbonCommonConstants.FACT_FILE_EXT)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
index f3b4be7..857cd81 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala
@@ -103,6 +103,9 @@ case class PreAggregateTableHelper(
.LOAD_SORT_SCOPE_DEFAULT))
tableProperties
.put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
+ tableProperties.put(CarbonCommonConstants.FLAT_FOLDER,
+ parentTable.getTableInfo.getFactTable.getTableProperties.asScala.getOrElse(
+ CarbonCommonConstants.FLAT_FOLDER, CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
val tableIdentifier =
TableIdentifier(parentTable.getTableName + "_" + dataMapName,
Some(parentTable.getDatabaseName))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 7d15cc1..617f5e8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -138,6 +138,11 @@ private[sql] case class CarbonDescribeFormattedCommand(
tblProps.get(CarbonCommonConstants.TABLE_ALLOWED_COMPACTION_DAYS),
CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT))
}
+ if (tblProps.containsKey(CarbonCommonConstants.FLAT_FOLDER)) {
+ results ++= Seq((CarbonCommonConstants.FLAT_FOLDER.toUpperCase,
+ tblProps.get(CarbonCommonConstants.FLAT_FOLDER),
+ CarbonCommonConstants.DEFAULT_FLAT_FOLDER))
+ }
results ++= Seq(("", "", ""), ("##Detailed Column property", "", ""))
if (colPropStr.length() > 0) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index 0bdef8a..7cee409 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -20,13 +20,15 @@ package org.apache.carbondata.spark.testsuite.partition
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
+import org.apache.hadoop.fs.Path
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -855,15 +857,24 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
validatePartitionTableFiles(partitions, dataFiles)
}
- def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
- val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
- val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
- val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
- override def accept(file: CarbonFile): Boolean = {
- return file.getName.endsWith(".carbondata")
- }
- })
- dataFiles
+ def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[String] = {
+ val segment = Segment.getSegment(segmentId, carbonTable.getTablePath)
+ if (segment.getSegmentFileName != null) {
+ val sfs = new SegmentFileStore(carbonTable.getTablePath, segment.getSegmentFileName)
+ sfs.readIndexFiles()
+ val indexFilesMap = sfs.getIndexFilesMap
+ val dataFiles = indexFilesMap.asScala.flatMap(_._2.asScala).map(f => new Path(f).getName)
+ dataFiles.toArray
+ } else {
+ val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+ val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+ val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+ override def accept(file: CarbonFile): Boolean = {
+ return file.getName.endsWith(".carbondata")
+ }
+ })
+ dataFiles.map(_.getName)
+ }
}
/**
@@ -871,10 +882,10 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
* @param partitions
* @param dataFiles
*/
- def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[CarbonFile]): Unit = {
+ def validatePartitionTableFiles(partitions: Seq[Int], dataFiles: Array[String]): Unit = {
val partitionIds: ListBuffer[Int] = new ListBuffer[Int]()
dataFiles.foreach { dataFile =>
- val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt
+ val partitionId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile).split("_")(0).toInt
partitionIds += partitionId
assert(partitions.contains(partitionId))
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
index 48733dc..1c7cb10 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/sql/CarbonGetTableDetailComandTestCase.scala
@@ -43,9 +43,9 @@ class CarbonGetTableDetailCommandTestCase extends QueryTest with BeforeAndAfterA
assertResult(2)(result.length)
assertResult("table_info1")(result(0).getString(0))
// 2096 is the size of carbon table
- assertResult(2096)(result(0).getLong(1))
+ assertResult(2098)(result(0).getLong(1))
assertResult("table_info2")(result(1).getString(0))
- assertResult(2096)(result(1).getLong(1))
+ assertResult(2098)(result(1).getLong(1))
}
override def afterAll: Unit = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 3dc34d3..bfa498e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -165,6 +165,7 @@ public class DataMapWriterListener {
writer.finish();
}
}
+ registry.clear();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
index eb02ede..fcabef5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/AbstractDataLoadProcessorStep.java
@@ -170,7 +170,8 @@ public abstract class AbstractDataLoadProcessorStep {
carbonDataFileAttributes.getTaskId(),
bucketId,
0,
- String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+ String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
+ configuration.getSegmentId()));
return listener;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index bc28ace..5bed8b1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -64,7 +64,7 @@ public class TableProcessingOperations {
CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile path) {
String segmentId =
- CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy");
+ CarbonTablePath.DataFileUtil.getSegmentIdFromPath(path.getAbsolutePath() + "/dummy");
boolean found = false;
for (int j = 0; j < details.length; j++) {
if (details[j].getLoadName().equals(segmentId)) {
@@ -76,8 +76,8 @@ public class TableProcessingOperations {
}
});
for (int k = 0; k < listFiles.length; k++) {
- String segmentId =
- CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy");
+ String segmentId = CarbonTablePath.DataFileUtil
+ .getSegmentIdFromPath(listFiles[k].getAbsolutePath() + "/dummy");
if (isCompactionFlow) {
if (segmentId.contains(".")) {
CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath());
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 90c297e..4d3f3fc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.List;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
@@ -83,7 +84,7 @@ public class CarbonLoadModel implements Serializable {
/**
* load Id
*/
- private String segmentId;
+ private Segment segment;
private String allDictPath;
@@ -424,7 +425,7 @@ public class CarbonLoadModel implements Serializable {
copy.blocksID = blocksID;
copy.taskNo = taskNo;
copy.factTimeStamp = factTimeStamp;
- copy.segmentId = segmentId;
+ copy.segment = segment;
copy.serializationNullFormat = serializationNullFormat;
copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
copy.badRecordsAction = badRecordsAction;
@@ -479,7 +480,7 @@ public class CarbonLoadModel implements Serializable {
copyObj.blocksID = blocksID;
copyObj.taskNo = taskNo;
copyObj.factTimeStamp = factTimeStamp;
- copyObj.segmentId = segmentId;
+ copyObj.segment = segment;
copyObj.serializationNullFormat = serializationNullFormat;
copyObj.badRecordsLoggerEnable = badRecordsLoggerEnable;
copyObj.badRecordsAction = badRecordsAction;
@@ -609,14 +610,24 @@ public class CarbonLoadModel implements Serializable {
* @return load Id
*/
public String getSegmentId() {
- return segmentId;
+ if (segment != null) {
+ return segment.getSegmentNo();
+ } else {
+ return null;
+ }
}
/**
* @param segmentId
*/
public void setSegmentId(String segmentId) {
- this.segmentId = segmentId;
+ if (segmentId != null) {
+ this.segment = Segment.toSegment(segmentId);
+ }
+ }
+
+ public Segment getSegment() {
+ return segment;
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index 7a11c8b..b22599d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -17,6 +17,7 @@
package org.apache.carbondata.processing.merger;
+import java.io.IOException;
import java.util.List;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -46,10 +47,11 @@ public abstract class AbstractResultProcessor {
public abstract void close();
protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
- CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+ CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel)
+ throws IOException {
CarbonDataFileAttributes carbonDataFileAttributes;
if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
- long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+ long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegment(),
loadModel.getTablePath());
// Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
// be written in same segment. So the TaskNo should be incremented by 1 from max val.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index fef8ab9..dde18a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -175,7 +175,6 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
partitionSpec.getPartitions());
} catch (IOException e) {
- isCompactionSuccess = false;
throw e;
}
}
@@ -428,6 +427,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
tempStoreLocation, carbonStoreLocation);
+ carbonFactDataHandlerModel.setSegmentId(carbonLoadModel.getSegmentId());
setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 9a3258e..b877d52 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -63,7 +63,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
public RowResultMergerProcessor(String databaseName,
String tableName, SegmentProperties segProp, String[] tempStoreLocation,
- CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) {
+ CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec)
+ throws IOException {
this.segprop = segProp;
this.partitionSpec = partitionSpec;
this.loadModel = loadModel;
@@ -84,6 +85,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
tempStoreLocation, carbonStoreLocation);
setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
+ carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 221697f..9b09269 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -59,6 +59,7 @@ public class RowResultProcessor {
carbonFactDataHandlerModel.setBucketId(bucketId);
//Note: set compaction flow just to convert decimal type
carbonFactDataHandlerModel.setCompactionFlow(true);
+ carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/60dfdd38/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 87a6de0..27249ab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -267,7 +267,8 @@ public class CarbonFactDataHandlerModel {
carbonDataFileAttributes.getTaskId(),
bucketId,
0,
- String.valueOf(carbonDataFileAttributes.getFactTimeStamp())));
+ String.valueOf(carbonDataFileAttributes.getFactTimeStamp()),
+ configuration.getSegmentId()));
}
carbonFactDataHandlerModel.dataMapWriterlistener = listener;
carbonFactDataHandlerModel.writingCoresCount = configuration.getWritingCoresCount();
@@ -337,7 +338,8 @@ public class CarbonFactDataHandlerModel {
CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(loadModel.getTaskNo()),
carbonFactDataHandlerModel.getBucketId(),
carbonFactDataHandlerModel.getTaskExtension(),
- String.valueOf(loadModel.getFactTimeStamp())));
+ String.valueOf(loadModel.getFactTimeStamp()),
+ loadModel.getSegmentId()));
carbonFactDataHandlerModel.dataMapWriterlistener = listener;
return carbonFactDataHandlerModel;