You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/19 14:16:02 UTC
[1/2] carbondata git commit: [CARBONDATA-2224][File Level Reader
Support] Refactoring of #2055
Repository: carbondata
Updated Branches:
refs/heads/carbonfile 7a124ecd8 -> 99766b8af
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index d284e50..f421d44 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -113,11 +113,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
//data source file format
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
- sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
@@ -162,11 +162,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
- sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
@@ -184,6 +184,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
cleanTestData()
}
+ // TODO: Make the sparkCarbonFileFormat to work without index file
test("Read sdk writer output file without Carbondata file should fail") {
buildTestData(false)
deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -194,11 +195,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
// data source file format
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
- sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
@@ -224,11 +225,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
//data source file format
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
- sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
@@ -254,11 +255,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
- sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
@@ -303,11 +304,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
if (sqlContext.sparkContext.version.startsWith("2.1")) {
//data source file format
- sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (sqlContext.sparkContext.version.startsWith("2.2")) {
//data source file format
sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 9a46676..de91f2a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -132,11 +132,11 @@ object TestSparkCarbonFileFormatWithSparkSession {
//data source file format
if (spark.sparkContext.version.startsWith("2.1")) {
//data source file format
- spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+ spark.sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
} else if (spark.sparkContext.version.startsWith("2.2")) {
//data source file format
spark.sql(
- s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+ s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
|'$filePath' """.stripMargin)
} else{
// TO DO
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 4378c15..2ba6e5e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -63,9 +63,9 @@ class CarbonIUDMergerRDD[K, V](
val defaultParallelism = sparkContext.defaultParallelism
val noOfBlocks = 0
- CarbonTableInputFormat.setSegmentsToAccess(
+ CarbonInputFormat.setSegmentsToAccess(
job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
- CarbonTableInputFormat.setTableInfo(
+ CarbonInputFormat.setTableInfo(
job.getConfiguration,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index da268c1..7ae2d14 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatus
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -263,10 +263,10 @@ class CarbonMergerRDD[K, V](
SparkHadoopUtil.get.addCredentials(jobConf)
val job: Job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
- CarbonTableInputFormat.setPartitionsToPrune(
+ CarbonInputFormat.setPartitionsToPrune(
job.getConfiguration,
carbonMergerMapping.currentPartitions.map(_.asJava).orNull)
- CarbonTableInputFormat.setTableInfo(job.getConfiguration,
+ CarbonInputFormat.setTableInfo(job.getConfiguration,
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
var updateDetails: UpdateVO = null
// initialise query_id for job
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 5647427..84f6659 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResult
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.merger.CarbonCompactionUtil
import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor
@@ -96,7 +96,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
val job = new Job(jobConf)
val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier,
partitionIds.toList.asJava, job)
- CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+ CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
job.getConfiguration.set("query.id", queryId)
val splits = format.getSplitsOfOneSegment(job, segmentId,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6afd2c0..a9b8353 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
import org.apache.carbondata.core.statusmanager.FileFormat
import org.apache.carbondata.core.util._
import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.InitInputMetrics
@@ -90,7 +90,7 @@ class CarbonScanRDD(
val jobConf = new JobConf(conf)
SparkHadoopUtil.get.addCredentials(jobConf)
val job = Job.getInstance(jobConf)
- val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal")
+ val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelformat")
val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
prepareFileInputFormatForDriver(job.getConfiguration)
} else {
@@ -432,53 +432,53 @@ class CarbonScanRDD(
}
def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
- CarbonTableInputFormat.setTableInfo(conf, tableInfo)
- CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
- CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+ CarbonInputFormat.setTableInfo(conf, tableInfo)
+ CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+ CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
if (partitionNames != null) {
- CarbonTableInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+ CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
}
createInputFormat(conf)
}
def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
- CarbonFileInputFormat.setTableInfo(conf, tableInfo)
- CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
- CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+ CarbonInputFormat.setTableInfo(conf, tableInfo)
+ CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+ CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
if (partitionNames != null) {
- CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+ CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
}
createFileInputFormat(conf)
}
- private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
- CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
+ private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
+ CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
val tableInfo1 = getTableInfo
- CarbonTableInputFormat.setTableInfo(conf, tableInfo1)
- CarbonTableInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
- CarbonTableInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
- CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
+ CarbonInputFormat.setTableInfo(conf, tableInfo1)
+ CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
+ CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
+ CarbonInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
createInputFormat(conf)
}
private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
val format = new CarbonFileInputFormat[Object]
- CarbonFileInputFormat.setTablePath(conf,
+ CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
- CarbonFileInputFormat.setQuerySegment(conf, identifier)
- CarbonFileInputFormat.setFilterPredicates(conf, filterExpression)
- CarbonFileInputFormat.setColumnProjection(conf, columnProjection)
- CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ CarbonInputFormat.setQuerySegment(conf, identifier)
+ CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonInputFormat.setColumnProjection(conf, columnProjection)
+ CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
if (CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
- CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
}
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
- CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+ CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
.getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
identifier.getCarbonTableIdentifier.getDatabaseName + "." +
identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
@@ -489,22 +489,22 @@ class CarbonScanRDD(
private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
val format = new CarbonTableInputFormat[Object]
- CarbonTableInputFormat.setTablePath(conf,
+ CarbonInputFormat.setTablePath(conf,
identifier.appendWithLocalPrefix(identifier.getTablePath))
- CarbonTableInputFormat.setQuerySegment(conf, identifier)
- CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
- CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
- CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ CarbonInputFormat.setQuerySegment(conf, identifier)
+ CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+ CarbonInputFormat.setColumnProjection(conf, columnProjection)
+ CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
if (CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
- CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+ CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
}
// when validate segments is disabled in thread local update it to CarbonTableInputFormat
val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
if (carbonSessionInfo != null) {
- CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+ CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
.getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
identifier.getCarbonTableIdentifier.getDatabaseName + "." +
identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 1656efa..48ebdb4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.CarbonInputSplit
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.util.CommonUtil
@@ -154,7 +154,7 @@ object PartitionUtils {
val job = new Job(jobConf)
val format = CarbonInputFormatUtil
.createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
- CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+ CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
val splits = format.getSplitsOfOneSegment(job, segmentId,
oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index c286c50..2d19fd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
case class CarbonCountStar(
attributesRaw: Seq[Attribute],
@@ -45,7 +45,7 @@ case class CarbonCountStar(
override def doExecute(): RDD[InternalRow] = {
val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
val (job, tableInputFormat) = createCarbonInputFormat(absoluteTableIdentifier)
- CarbonTableInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier)
+ CarbonInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier)
// get row count
val rowCount = CarbonUpdateUtil.getRowCount(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 10d55ef..8eaeab1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -43,7 +43,7 @@ import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.processing.exception.MultipleMatchingException
import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -90,7 +90,7 @@ object DeleteExecution {
}
val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier)
- CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+ CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
val keyRdd = deleteRdd.map({ row =>
val tupleId: String = row
.getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index cf22569..74da11a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -150,7 +150,7 @@ case class CarbonDropTableCommand(
// delete table data only if it is not external table
if (FileFactory.isFileExist(tablePath, fileType) &&
- !(carbonTable.isExternalTable || carbonTable.isFileLevelExternalTable)) {
+ !(carbonTable.isExternalTable || carbonTable.isFileLevelFormat)) {
val file = FileFactory.getCarbonFile(tablePath, fileType)
CarbonUtil.deleteFoldersAndFilesSilent(file)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index fa54e0d..2daece3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats}
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, DataMapJob}
import org.apache.carbondata.spark.util.CarbonScalaUtil
@InterfaceAudience.User
@@ -105,9 +105,9 @@ class SparkCarbonFileFormat extends FileFormat
}
}
- override def shortName(): String = "Carbonfile"
+ override def shortName(): String = "carbonfile"
- override def toString: String = "Carbonfile"
+ override def toString: String = "carbonfile"
override def hashCode(): Int = getClass.hashCode()
@@ -179,10 +179,9 @@ class SparkCarbonFileFormat extends FileFormat
supportBatchValue = supportBatch(sparkSession, dataSchema)
}
- CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy")
- CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default")
+ CarbonInputFormat.setTableName(job.getConfiguration, "externaldummy")
+ CarbonInputFormat.setDatabaseName(job.getConfiguration, "default")
CarbonMetadata.getInstance.removeTable("default_externaldummy")
- val dataMapJob: DataMapJob = CarbonFileInputFormat.getDataMapJob(job.getConfiguration)
val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
(file: PartitionedFile) => {
@@ -207,9 +206,9 @@ class SparkCarbonFileFormat extends FileFormat
conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy")
conf1.set("mapreduce.input.carboninputformat.databaseName", "default")
conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath)
- CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection)
+ CarbonInputFormat.setColumnProjection(conf1, carbonProjection)
filter match {
- case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c)
+ case Some(c) => CarbonInputFormat.setFilterPredicates(conf1, c)
case None => None
}
val attemptContext = new TaskAttemptContextImpl(conf1, attemptId)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index d85ef68..b20349c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -112,7 +112,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
if (isCarbonTable) {
val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName,
alterTableChangeDataTypeModel.tableName)(sparkSession)
- if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+ if (carbonTable != null && carbonTable.isFileLevelFormat) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on Carbon external fileformat table")
} else {
@@ -128,7 +128,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
if (isCarbonTable) {
val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName,
alterTableAddColumnsModel.tableName)(sparkSession)
- if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+ if (carbonTable != null && carbonTable.isFileLevelFormat) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on Carbon external fileformat table")
} else {
@@ -144,7 +144,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
if (isCarbonTable) {
val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName,
alterTableDropColumnModel.tableName)(sparkSession)
- if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+ if (carbonTable != null && carbonTable.isFileLevelFormat) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on Carbon external fileformat table")
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 69fd366..55eb5ac 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -265,7 +265,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
val table = try {
val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
- provider.equalsIgnoreCase("'Carbonfile'")) {
+ provider.equalsIgnoreCase("'carbonfile'")) {
SchemaReader.inferSchema(identifier)
}
else {
@@ -277,12 +277,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
}
// set "_external" property, so that DROP TABLE will not delete the data
- if (provider.equalsIgnoreCase("'Carbonfile'")) {
- table.getFactTable.getTableProperties.put("_filelevelexternal", "true")
+ if (provider.equalsIgnoreCase("'carbonfile'")) {
+ table.getFactTable.getTableProperties.put("_filelevelformat", "true")
table.getFactTable.getTableProperties.put("_external", "false")
} else {
table.getFactTable.getTableProperties.put("_external", "true")
- table.getFactTable.getTableProperties.put("_filelevelexternal", "false")
+ table.getFactTable.getTableProperties.put("_filelevelformat", "false")
}
table
} else {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index c6bab9e..b0e0ae4 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -326,7 +326,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
val fileStorage = helper.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("'Carbonfile'") ||
+ fileStorage.equalsIgnoreCase("'carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index c28e4ba..3cb9bd6 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -325,7 +325,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
val fileStorage = helper.getFileStorage(ctx.createFileFormat)
if (fileStorage.equalsIgnoreCase("'carbondata'") ||
- fileStorage.equalsIgnoreCase("'Carbonfile'") ||
+ fileStorage.equalsIgnoreCase("'carbonfile'") ||
fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 47ef5f2..8ab658d 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -145,16 +145,16 @@ class StreamHandoffRDD[K, V](
val inputSplit = split.asInstanceOf[HandoffPartition].split.value
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val hadoopConf = new Configuration()
- CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
- CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
- CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
+ CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
+ CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
+ CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
val projection = new CarbonProjection
val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
(0 until dataFields.size()).foreach { index =>
projection.addColumn(dataFields.get(index).getColName)
}
- CarbonTableInputFormat.setColumnProjection(hadoopConf, projection)
- CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
+ CarbonInputFormat.setColumnProjection(hadoopConf, projection)
+ CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
val format = new CarbonTableInputFormat[Array[Object]]()
val model = format.createQueryModel(inputSplit, attemptContext)
[2/2] carbondata git commit: [CARBONDATA-2224][File Level Reader
Support] Refactoring of #2055
Posted by ja...@apache.org.
[CARBONDATA-2224][File Level Reader Support] Refactoring of #2055
Review comment fixes and refactoring of #2055
This closes #2069
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/99766b8a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/99766b8a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/99766b8a
Branch: refs/heads/carbonfile
Commit: 99766b8af0b021a0a06ffa893d09cc82774f9c66
Parents: 7a124ec
Author: Ajantha-Bhat <aj...@gmail.com>
Authored: Fri Mar 16 16:06:04 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Mar 19 22:15:45 2018 +0800
----------------------------------------------------------------------
.../core/metadata/schema/table/CarbonTable.java | 4 +-
.../apache/carbondata/core/util/CarbonUtil.java | 2 +-
.../carbondata/examples/HadoopFileExample.scala | 8 +-
.../hadoop/api/CarbonFileInputFormat.java | 535 ++-----------------
.../hadoop/api/CarbonInputFormat.java | 534 ++++++++++++++++++
.../hadoop/api/CarbonTableInputFormat.java | 463 +---------------
.../carbondata/hadoop/util/SchemaReader.java | 8 +-
...FileInputFormatWithExternalCarbonTable.scala | 16 +-
...tCreateTableUsingSparkCarbonFileFormat.scala | 25 +-
...tSparkCarbonFileFormatWithSparkSession.scala | 4 +-
.../spark/rdd/CarbonIUDMergerRDD.scala | 6 +-
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +-
.../spark/rdd/CarbonScanPartitionRDD.scala | 4 +-
.../carbondata/spark/rdd/CarbonScanRDD.scala | 60 +--
.../org/apache/spark/util/PartitionUtils.scala | 4 +-
.../org/apache/spark/sql/CarbonCountStar.scala | 4 +-
.../command/mutation/DeleteExecution.scala | 4 +-
.../command/table/CarbonDropTableCommand.scala | 2 +-
.../datasources/SparkCarbonFileFormat.scala | 15 +-
.../sql/execution/strategy/DDLStrategy.scala | 6 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 8 +-
.../spark/sql/hive/CarbonSessionState.scala | 2 +-
.../spark/sql/hive/CarbonSessionState.scala | 2 +-
.../carbondata/streaming/StreamHandoffRDD.scala | 12 +-
24 files changed, 669 insertions(+), 1065 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 278dc96..9e0d80a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -826,8 +826,8 @@ public class CarbonTable implements Serializable {
return external != null && external.equalsIgnoreCase("true");
}
- public boolean isFileLevelExternalTable() {
- String external = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal");
+ public boolean isFileLevelFormat() {
+ String external = tableInfo.getFactTable().getTableProperties().get("_filelevelformat");
return external != null && external.equalsIgnoreCase("true");
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index ff49edf..855bdee 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2213,7 +2213,7 @@ public final class CarbonUtil {
* @param schemaFilePath
* @return
*/
- public static org.apache.carbondata.format.TableInfo inferSchemaFileExternalTable(
+ public static org.apache.carbondata.format.TableInfo inferSchema(
String carbonDataFilePath, AbsoluteTableIdentifier absoluteTableIdentifier,
boolean schemaExists) throws IOException {
TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
index d75abc2..465e660 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/HadoopFileExample.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.examples
import org.apache.hadoop.conf.Configuration
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
import org.apache.carbondata.hadoop.CarbonProjection
// scalastyle:off println
@@ -34,9 +34,9 @@ object HadoopFileExample {
projection.addColumn("c1") // column c1
projection.addColumn("c3") // column c3
val conf = new Configuration()
- CarbonTableInputFormat.setColumnProjection(conf, projection)
- CarbonTableInputFormat.setDatabaseName(conf, "default")
- CarbonTableInputFormat.setTableName(conf, "carbon1")
+ CarbonInputFormat.setColumnProjection(conf, projection)
+ CarbonInputFormat.setDatabaseName(conf, "default")
+ CarbonInputFormat.setTableName(conf, "carbon1")
val sc = spark.sparkContext
val input = sc.newAPIHadoopFile(s"${ExampleUtils.storeLocation}/default/carbon1",
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index b86b1cc..ff532b7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -17,11 +17,8 @@
package org.apache.carbondata.hadoop.api;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.io.Serializable;
-import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.LinkedList;
@@ -30,21 +27,11 @@ import java.util.Map;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.Segment;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
-import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
-import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.mutate.UpdateVO;
@@ -52,241 +39,58 @@ import org.apache.carbondata.core.scan.expression.Expression;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeConverter;
-import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.carbondata.hadoop.util.SchemaReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.security.TokenCache;
/**
- * Input format of CarbonData file.
+ * InputFormat for reading carbondata files without table level metadata support,
+ * schema is inferred as following steps:
+ * 1. read from schema file is exists
+ * 2. read from data file footer
*
* @param <T>
*/
@InterfaceAudience.User
@InterfaceStability.Evolving
-public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implements Serializable {
-
- public static final String READ_SUPPORT_CLASS = "carbon.read.support.class";
- // comma separated list of input segment numbers
- public static final String INPUT_SEGMENT_NUMBERS =
- "mapreduce.input.carboninputformat.segmentnumbers";
- private static final String VALIDATE_INPUT_SEGMENT_IDs =
- "mapreduce.input.carboninputformat.validsegments";
- // comma separated list of input files
- public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
- private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
- private static final Log LOG = LogFactory.getLog(CarbonFileInputFormat.class);
- private static final String FILTER_PREDICATE =
- "mapreduce.input.carboninputformat.filter.predicate";
- private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
- private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
- private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
- private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
- private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
- public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
- public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
- private static final String PARTITIONS_TO_PRUNE =
- "mapreduce.input.carboninputformat.partitions.to.prune";
- public static final String UPADTE_T =
- "mapreduce.input.carboninputformat.partitions.to.prune";
+public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Serializable {
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
- /**
- * Set the `tableInfo` in `configuration`
- */
- public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
- throws IOException {
- if (null != tableInfo) {
- configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
- }
- }
-
- /**
- * Get TableInfo object from `configuration`
- */
- private static TableInfo getTableInfo(Configuration configuration) throws IOException {
- String tableInfoStr = configuration.get(TABLE_INFO);
- if (tableInfoStr == null) {
- return null;
+ protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ CarbonTable carbonTableTemp;
+ if (carbonTable == null) {
+ // carbon table should be created either from deserialized table info (schema saved in
+ // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
+ TableInfo tableInfo = getTableInfo(configuration);
+ CarbonTable localCarbonTable;
+ if (tableInfo != null) {
+ localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
+ } else {
+ String schemaPath = CarbonTablePath
+ .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
+ if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
+ TableInfo tableInfoInfer =
+ SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
+ localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
+ } else {
+ localCarbonTable =
+ SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
+ }
+ }
+ this.carbonTable = localCarbonTable;
+ return localCarbonTable;
} else {
- TableInfo output = new TableInfo();
- output.readFields(
- new DataInputStream(
- new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
- return output;
- }
- }
-
-
- public static void setTablePath(Configuration configuration, String tablePath) {
- configuration.set(FileInputFormat.INPUT_DIR, tablePath);
- }
-
- public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
- configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
- }
-
-
- public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
- throws IOException {
- if (dataMapJob != null) {
- String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
- configuration.set(DATA_MAP_DSTR, toString);
- }
- }
-
- public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
- String jobString = configuration.get(DATA_MAP_DSTR);
- if (jobString != null) {
- return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
- }
- return null;
- }
-
- /**
- * It sets unresolved filter expression.
- *
- * @param configuration
- * @param filterExpression
- */
- public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
- if (filterExpression == null) {
- return;
- }
- try {
- String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
- configuration.set(FILTER_PREDICATE, filterString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting filter expression to Job", e);
- }
- }
-
- public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
- if (projection == null || projection.isEmpty()) {
- return;
- }
- String[] allColumns = projection.getAllColumns();
- StringBuilder builder = new StringBuilder();
- for (String column : allColumns) {
- builder.append(column).append(",");
- }
- String columnString = builder.toString();
- columnString = columnString.substring(0, columnString.length() - 1);
- configuration.set(COLUMN_PROJECTION, columnString);
- }
-
- public static String getColumnProjection(Configuration configuration) {
- return configuration.get(COLUMN_PROJECTION);
- }
-
-
- /**
- * Set list of segments to access
- */
- public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
- configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
- }
-
- /**
- * Set `CARBON_INPUT_SEGMENTS` from property to configuration
- */
- public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
- String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
- String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
- String segmentNumbersFromProperty = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
- if (!segmentNumbersFromProperty.trim().equals("*")) {
- CarbonFileInputFormat
- .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
- }
- }
-
- /**
- * set list of segment to access
- */
- public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
- configuration.set(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
- }
-
- /**
- * get list of segment to access
- */
- public static boolean getValidateSegmentsToAccess(Configuration configuration) {
- return configuration.get(CarbonFileInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
- .equalsIgnoreCase("true");
- }
-
- /**
- * set list of partitions to prune
- */
- public static void setPartitionsToPrune(Configuration configuration,
- List<PartitionSpec> partitions) {
- if (partitions == null) {
- return;
- }
- try {
- String partitionString =
- ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
- configuration.set(PARTITIONS_TO_PRUNE, partitionString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting patition information to Job", e);
- }
- }
-
- /**
- * get list of partitions to prune
- */
- private static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
- throws IOException {
- String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
- if (partitionString != null) {
- return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
- }
- return null;
- }
-
- public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
- throws IOException {
- String tablePath = configuration.get(INPUT_DIR, "");
- try {
- return AbsoluteTableIdentifier
- .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
- } catch (InvalidConfigurationException e) {
- throw new IOException(e);
+ carbonTableTemp = this.carbonTable;
+ return carbonTableTemp;
}
}
@@ -306,8 +110,6 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement
if (null == carbonTable) {
throw new IOException("Missing/Corrupt schema file for table.");
}
- // TableDataMap blockletMap = DataMapStoreManager.getInstance()
- // .getDataMap(identifier, BlockletDataMap.NAME, BlockletDataMapFactory.class.getName());
if (getValidateSegmentsToAccess(job.getConfiguration())) {
// get all valid segments and set them into the configuration
@@ -346,8 +148,6 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement
return null;
}
-
-
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR, CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS
@@ -404,279 +204,4 @@ public class CarbonFileInputFormat<T> extends FileInputFormat<Void, T> implement
}
return result;
}
-
- protected Expression getFilterPredicates(Configuration configuration) {
- try {
- String filterExprString = configuration.get(FILTER_PREDICATE);
- if (filterExprString == null) {
- return null;
- }
- Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
- return (Expression) filter;
- } catch (IOException e) {
- throw new RuntimeException("Error while reading filter expression", e);
- }
- }
-
- /**
- * get data blocks of given segment
- */
- private List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
- AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
- BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
- List<Integer> oldPartitionIdList) throws IOException {
-
- QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
- QueryStatistic statistic = new QueryStatistic();
-
- // get tokens for all the required FileSystem for table path
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
- boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
- DataMapExprWrapper dataMapExprWrapper =
- DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
- DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
- List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
- List<ExtendedBlocklet> prunedBlocklets;
- if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
- DistributableDataMapFormat datamapDstr =
- new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
- segmentIds, partitionsToPrune,
- BlockletDataMapFactory.class.getName());
- prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
- // Apply expression on the blocklets.
- prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
- } else {
- prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
- }
-
- List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
- int partitionIndex = 0;
- List<Integer> partitionIdList = new ArrayList<>();
- if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
- partitionIdList = partitionInfo.getPartitionIds();
- }
- for (ExtendedBlocklet blocklet : prunedBlocklets) {
- long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
- CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
-
- // OldPartitionIdList is only used in alter table partition command because it change
- // partition info first and then read data.
- // For other normal query should use newest partitionIdList
- if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
- if (oldPartitionIdList != null) {
- partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
- } else {
- partitionIndex = partitionIdList.indexOf((int)partitionId);
- }
- }
- if (partitionIndex != -1) {
- // matchedPartitions variable will be null in two cases as follows
- // 1. the table is not a partition table
- // 2. the table is a partition table, and all partitions are matched by query
- // for partition table, the task id of carbaondata file name is the partition id.
- // if this partition is not required, here will skip it.
- if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
- CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
- if (inputSplit != null) {
- resultFilterredBlocks.add(inputSplit);
- }
- }
- }
- }
- statistic
- .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
- recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
- return resultFilterredBlocks;
- }
-
- private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
- CarbonInputSplit split =
- CarbonInputSplit.from(blocklet.getSegmentId(),
- blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
- blocklet.getLength(), blocklet.getLocations()),
- ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
- blocklet.getDataMapWriterPath());
- split.setDetailInfo(blocklet.getDetailInfo());
- return split;
- }
-
- @Override
- public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- Configuration configuration = taskAttemptContext.getConfiguration();
- QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
- CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
- return new CarbonRecordReader<T>(queryModel, readSupport);
- }
-
- public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException {
- Configuration configuration = taskAttemptContext.getConfiguration();
- CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
- TableProvider tableProvider = new SingleTableProvider(carbonTable);
-
- // query plan includes projection column
- String projectionString = getColumnProjection(configuration);
- String[] projectionColumnNames = null;
- if (projectionString != null) {
- projectionColumnNames = projectionString.split(",");
- }
- QueryModel queryModel = carbonTable.createQueryWithProjection(
- projectionColumnNames, getDataTypeConverter(configuration));
-
- // set the filter to the query model in order to filter blocklet before scan
- Expression filter = getFilterPredicates(configuration);
- boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
- // getAllMeasures returns list of visible and invisible columns
- boolean[] isFilterMeasures =
- new boolean[carbonTable.getAllMeasures().size()];
- CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
- isFilterMeasures);
- queryModel.setIsFilterDimensions(isFilterDimensions);
- queryModel.setIsFilterMeasures(isFilterMeasures);
- FilterResolverIntf filterIntf = CarbonInputFormatUtil
- .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
- queryModel.setFilterExpressionResolverTree(filterIntf);
-
- // update the file level index store if there are invalid segment
- if (inputSplit instanceof CarbonMultiBlockSplit) {
- CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
- List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
- if (invalidSegments.size() > 0) {
- queryModel.setInvalidSegmentIds(invalidSegments);
- }
- List<UpdateVO> invalidTimestampRangeList =
- split.getAllSplits().get(0).getInvalidTimestampRange();
- if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
- queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
- }
- }
- return queryModel;
- }
-
- private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
- CarbonTable carbonTableTemp;
- if (carbonTable == null) {
- // carbon table should be created either from deserialized table info (schema saved in
- // hive metastore) or by reading schema in HDFS (schema saved in HDFS)
- TableInfo tableInfo = getTableInfo(configuration);
- CarbonTable localCarbonTable;
- if (tableInfo != null) {
- localCarbonTable = CarbonTable.buildFromTableInfo(tableInfo);
- } else {
- String schemaPath = CarbonTablePath
- .getSchemaFilePath(getAbsoluteTableIdentifier(configuration).getTablePath());
- if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath))) {
- TableInfo tableInfoInfer =
- SchemaReader.inferSchema(getAbsoluteTableIdentifier(configuration));
- localCarbonTable = CarbonTable.buildFromTableInfo(tableInfoInfer);
- } else {
- localCarbonTable =
- SchemaReader.readCarbonTableFromStore(getAbsoluteTableIdentifier(configuration));
- }
- }
- this.carbonTable = localCarbonTable;
- return localCarbonTable;
- } else {
- carbonTableTemp = this.carbonTable;
- return carbonTableTemp;
- }
- }
-
-
- public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
- String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
- //By default it uses dictionary decoder read class
- CarbonReadSupport<T> readSupport = null;
- if (readSupportClass != null) {
- try {
- Class<?> myClass = Class.forName(readSupportClass);
- Constructor<?> constructor = myClass.getConstructors()[0];
- Object object = constructor.newInstance();
- if (object instanceof CarbonReadSupport) {
- readSupport = (CarbonReadSupport) object;
- }
- } catch (ClassNotFoundException ex) {
- LOG.error("Class " + readSupportClass + "not found", ex);
- } catch (Exception ex) {
- LOG.error("Error while creating " + readSupportClass, ex);
- }
- } else {
- readSupport = new DictionaryDecodeReadSupport<>();
- }
- return readSupport;
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- try {
- // Don't split the file if it is local file system
- FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
- if (fileSystem instanceof LocalFileSystem) {
- return false;
- }
- } catch (Exception e) {
- return true;
- }
- return true;
- }
-
- /**
- * return valid segment to access
- */
- private String[] getSegmentsToAccess(JobContext job) {
- String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, "");
- if (segmentString.trim().isEmpty()) {
- return new String[0];
- }
- return segmentString.split(",");
- }
-
- public static DataTypeConverter getDataTypeConverter(Configuration configuration)
- throws IOException {
- String converter = configuration.get(CARBON_CONVERTER);
- if (converter == null) {
- return new DataTypeConverterImpl();
- }
- return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
- }
-
- public static void setDatabaseName(Configuration configuration, String databaseName) {
- if (null != databaseName) {
- configuration.set(DATABASE_NAME, databaseName);
- }
- }
-
- public static String getDatabaseName(Configuration configuration)
- throws InvalidConfigurationException {
- String databseName = configuration.get(DATABASE_NAME);
- if (null == databseName) {
- throw new InvalidConfigurationException("Database name is not set.");
- }
- return databseName;
- }
-
- public static void setTableName(Configuration configuration, String tableName) {
- if (null != tableName) {
- configuration.set(TABLE_NAME, tableName);
- }
- }
-
- public static String getTableName(Configuration configuration)
- throws InvalidConfigurationException {
- String tableName = configuration.get(TABLE_NAME);
- if (tableName == null) {
- throw new InvalidConfigurationException("Table name is not set");
- }
- return tableName;
- }
-
- public org.apache.hadoop.mapred.RecordReader<Void, T> getRecordReader(
- org.apache.hadoop.mapred.InputSplit split, JobConf job, Reporter reporter)
- throws IOException {
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
new file mode 100644
index 0000000..3cc9c5f
--- /dev/null
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hadoop.api;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.DataMapChooser;
+import org.apache.carbondata.core.datamap.DataMapLevel;
+import org.apache.carbondata.core.datamap.Segment;
+import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
+import org.apache.carbondata.core.exception.InvalidConfigurationException;
+import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.SingleTableProvider;
+import org.apache.carbondata.core.scan.filter.TableProvider;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.stats.QueryStatistic;
+import org.apache.carbondata.core.stats.QueryStatisticsConstants;
+import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeConverter;
+import org.apache.carbondata.core.util.DataTypeConverterImpl;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.CarbonProjection;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+
+/**
+ * Base class for carbondata input format, there are two input format implementations:
+ * 1. CarbonFileInputFormat: for reading carbondata files without table level metadata support.
+ *
+ * 2. CarbonTableInputFormat: for reading carbondata files with table level metadata support,
+ * such as segment and explicit schema metadata.
+ *
+ * @param <T>
+ */
+public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
+ // comma separated list of input segment numbers
+ public static final String INPUT_SEGMENT_NUMBERS =
+ "mapreduce.input.carboninputformat.segmentnumbers";
+ private static final String VALIDATE_INPUT_SEGMENT_IDs =
+ "mapreduce.input.carboninputformat.validsegments";
+ // comma separated list of input files
+ private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
+ private static final Log LOG = LogFactory.getLog(CarbonInputFormat.class);
+ private static final String FILTER_PREDICATE =
+ "mapreduce.input.carboninputformat.filter.predicate";
+ private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
+ private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
+ private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
+ private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
+ private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
+ public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
+ public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
+ private static final String PARTITIONS_TO_PRUNE =
+ "mapreduce.input.carboninputformat.partitions.to.prune";
+
+ /**
+ * Set the `tableInfo` in `configuration`
+ */
+ public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
+ throws IOException {
+ if (null != tableInfo) {
+ configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
+ }
+ }
+
+ /**
+ * Get TableInfo object from `configuration`
+ */
+ protected static TableInfo getTableInfo(Configuration configuration) throws IOException {
+ String tableInfoStr = configuration.get(TABLE_INFO);
+ if (tableInfoStr == null) {
+ return null;
+ } else {
+ TableInfo output = new TableInfo();
+ output.readFields(new DataInputStream(
+ new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
+ return output;
+ }
+ }
+
+ /**
+ * Get the cached CarbonTable or create it by TableInfo in `configuration`
+ */
+ protected abstract CarbonTable getOrCreateCarbonTable(Configuration configuration)
+ throws IOException;
+
+ public static void setTablePath(Configuration configuration, String tablePath) {
+ configuration.set(FileInputFormat.INPUT_DIR, tablePath);
+ }
+
+ public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
+ configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
+ }
+
+ public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
+ throws IOException {
+ if (dataMapJob != null) {
+ String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
+ configuration.set(DATA_MAP_DSTR, toString);
+ }
+ }
+
+ public static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
+ String jobString = configuration.get(DATA_MAP_DSTR);
+ if (jobString != null) {
+ return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
+ }
+ return null;
+ }
+
+ /**
+ * It sets unresolved filter expression.
+ *
+ * @param configuration
+ * @param filterExpression
+ */
+ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
+ if (filterExpression == null) {
+ return;
+ }
+ try {
+ String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
+ configuration.set(FILTER_PREDICATE, filterString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting filter expression to Job", e);
+ }
+ }
+
+ public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
+ if (projection == null || projection.isEmpty()) {
+ return;
+ }
+ String[] allColumns = projection.getAllColumns();
+ StringBuilder builder = new StringBuilder();
+ for (String column : allColumns) {
+ builder.append(column).append(",");
+ }
+ String columnString = builder.toString();
+ columnString = columnString.substring(0, columnString.length() - 1);
+ configuration.set(COLUMN_PROJECTION, columnString);
+ }
+
+ public static String getColumnProjection(Configuration configuration) {
+ return configuration.get(COLUMN_PROJECTION);
+ }
+
+ /**
+ * Set list of segments to access
+ */
+ public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
+ configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
+ }
+
+ /**
+ * Set `CARBON_INPUT_SEGMENTS` from property to configuration
+ */
+ public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
+ String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
+ String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
+ String segmentNumbersFromProperty = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
+ if (!segmentNumbersFromProperty.trim().equals("*")) {
+ CarbonInputFormat
+ .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
+ }
+ }
+
+ /**
+ * set list of segment to access
+ */
+ public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
+ configuration.set(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
+ }
+
+ /**
+ * get list of segment to access
+ */
+ public static boolean getValidateSegmentsToAccess(Configuration configuration) {
+ return configuration.get(CarbonInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
+ .equalsIgnoreCase("true");
+ }
+
+ /**
+ * set list of partitions to prune
+ */
+ public static void setPartitionsToPrune(Configuration configuration,
+ List<PartitionSpec> partitions) {
+ if (partitions == null) {
+ return;
+ }
+ try {
+ String partitionString =
+ ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
+ configuration.set(PARTITIONS_TO_PRUNE, partitionString);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while setting patition information to Job" + partitions, e);
+ }
+ }
+
+ /**
+ * get list of partitions to prune
+ */
+ public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
+ throws IOException {
+ String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
+ if (partitionString != null) {
+ return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
+ }
+ return null;
+ }
+
+ public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+ throws IOException {
+ String tablePath = configuration.get(INPUT_DIR, "");
+ try {
+ return AbsoluteTableIdentifier
+ .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
+ } catch (InvalidConfigurationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * {@inheritDoc}
+ * Configurations FileInputFormat.INPUT_DIR
+ * are used to get table path to read.
+ *
+ * @param job
+ * @return List<InputSplit> list of CarbonInputSplit
+ * @throws IOException
+ */
+ @Override public abstract List<InputSplit> getSplits(JobContext job) throws IOException;
+
+ protected Expression getFilterPredicates(Configuration configuration) {
+ try {
+ String filterExprString = configuration.get(FILTER_PREDICATE);
+ if (filterExprString == null) {
+ return null;
+ }
+ Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
+ return (Expression) filter;
+ } catch (IOException e) {
+ throw new RuntimeException("Error while reading filter expression", e);
+ }
+ }
+
+ /**
+ * get data blocks of given segment
+ */
+ protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
+ AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+ BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
+ List<Integer> oldPartitionIdList) throws IOException {
+
+ QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
+ QueryStatistic statistic = new QueryStatistic();
+
+ // get tokens for all the required FileSystem for table path
+ TokenCache.obtainTokensForNamenodes(job.getCredentials(),
+ new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+ boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
+ CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
+ DataMapExprWrapper dataMapExprWrapper =
+ DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
+ DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
+ List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
+ List<ExtendedBlocklet> prunedBlocklets;
+ if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
+ DistributableDataMapFormat datamapDstr =
+ new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds,
+ partitionsToPrune, BlockletDataMapFactory.class.getName());
+ prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
+ // Apply expression on the blocklets.
+ prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
+ } else {
+ prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
+ }
+
+ List<CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
+ int partitionIndex = 0;
+ List<Integer> partitionIdList = new ArrayList<>();
+ if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+ partitionIdList = partitionInfo.getPartitionIds();
+ }
+ for (ExtendedBlocklet blocklet : prunedBlocklets) {
+ long partitionId = CarbonTablePath.DataFileUtil
+ .getTaskIdFromTaskNo(CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
+
+ // OldPartitionIdList is only used in alter table partition command because it change
+ // partition info first and then read data.
+ // For other normal query should use newest partitionIdList
+ if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
+ if (oldPartitionIdList != null) {
+ partitionIndex = oldPartitionIdList.indexOf((int) partitionId);
+ } else {
+ partitionIndex = partitionIdList.indexOf((int) partitionId);
+ }
+ }
+ if (partitionIndex != -1) {
+ // matchedPartitions variable will be null in two cases as follows
+ // 1. the table is not a partition table
+ // 2. the table is a partition table, and all partitions are matched by query
+ // for partition table, the task id of carbaondata file name is the partition id.
+ // if this partition is not required, here will skip it.
+ if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
+ CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
+ if (inputSplit != null) {
+ resultFilterredBlocks.add(inputSplit);
+ }
+ }
+ }
+ }
+ statistic
+ .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
+ recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
+ return resultFilterredBlocks;
+ }
+
+ private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
+ CarbonInputSplit split = CarbonInputSplit
+ .from(blocklet.getSegmentId(), blocklet.getBlockletId(),
+ new FileSplit(new Path(blocklet.getPath()), 0, blocklet.getLength(),
+ blocklet.getLocations()),
+ ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
+ blocklet.getDataMapWriterPath());
+ split.setDetailInfo(blocklet.getDetailInfo());
+ return split;
+ }
+
+ @Override public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
+ TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
+ CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
+ return new CarbonRecordReader<T>(queryModel, readSupport);
+ }
+
+ public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+ throws IOException {
+ Configuration configuration = taskAttemptContext.getConfiguration();
+ CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
+ TableProvider tableProvider = new SingleTableProvider(carbonTable);
+
+ // query plan includes projection column
+ String projectionString = getColumnProjection(configuration);
+ String[] projectionColumnNames = null;
+ if (projectionString != null) {
+ projectionColumnNames = projectionString.split(",");
+ }
+ QueryModel queryModel = carbonTable
+ .createQueryWithProjection(projectionColumnNames, getDataTypeConverter(configuration));
+
+ // set the filter to the query model in order to filter blocklet before scan
+ Expression filter = getFilterPredicates(configuration);
+ boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
+ // getAllMeasures returns list of visible and invisible columns
+ boolean[] isFilterMeasures = new boolean[carbonTable.getAllMeasures().size()];
+ CarbonInputFormatUtil
+ .processFilterExpression(filter, carbonTable, isFilterDimensions, isFilterMeasures);
+ queryModel.setIsFilterDimensions(isFilterDimensions);
+ queryModel.setIsFilterMeasures(isFilterMeasures);
+ FilterResolverIntf filterIntf = CarbonInputFormatUtil
+ .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
+ queryModel.setFilterExpressionResolverTree(filterIntf);
+
+ // update the file level index store if there are invalid segment
+ if (inputSplit instanceof CarbonMultiBlockSplit) {
+ CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
+ List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
+ if (invalidSegments.size() > 0) {
+ queryModel.setInvalidSegmentIds(invalidSegments);
+ }
+ List<UpdateVO> invalidTimestampRangeList =
+ split.getAllSplits().get(0).getInvalidTimestampRange();
+ if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
+ queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
+ }
+ }
+ return queryModel;
+ }
+
+ public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
+ String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
+ //By default it uses dictionary decoder read class
+ CarbonReadSupport<T> readSupport = null;
+ if (readSupportClass != null) {
+ try {
+ Class<?> myClass = Class.forName(readSupportClass);
+ Constructor<?> constructor = myClass.getConstructors()[0];
+ Object object = constructor.newInstance();
+ if (object instanceof CarbonReadSupport) {
+ readSupport = (CarbonReadSupport) object;
+ }
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Class " + readSupportClass + "not found", ex);
+ } catch (Exception ex) {
+ LOG.error("Error while creating " + readSupportClass, ex);
+ }
+ } else {
+ readSupport = new DictionaryDecodeReadSupport<>();
+ }
+ return readSupport;
+ }
+
+ @Override protected boolean isSplitable(JobContext context, Path filename) {
+ try {
+ // Don't split the file if it is local file system
+ FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
+ if (fileSystem instanceof LocalFileSystem) {
+ return false;
+ }
+ } catch (Exception e) {
+ return true;
+ }
+ return true;
+ }
+
+ public static void setCarbonReadSupport(Configuration configuration,
+ Class<? extends CarbonReadSupport> readSupportClass) {
+ if (readSupportClass != null) {
+ configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
+ }
+ }
+
+ /**
+ * It is optional, if user does not set then it reads from store
+ *
+ * @param configuration
+ * @param converter is the Data type converter for different computing engine
+ * @throws IOException
+ */
+ public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter)
+ throws IOException {
+ if (null != converter) {
+ configuration.set(CARBON_CONVERTER,
+ ObjectSerializationUtil.convertObjectToString(converter));
+ }
+ }
+
+ public static DataTypeConverter getDataTypeConverter(Configuration configuration)
+ throws IOException {
+ String converter = configuration.get(CARBON_CONVERTER);
+ if (converter == null) {
+ return new DataTypeConverterImpl();
+ }
+ return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
+ }
+
+ public static void setDatabaseName(Configuration configuration, String databaseName) {
+ if (null != databaseName) {
+ configuration.set(DATABASE_NAME, databaseName);
+ }
+ }
+
+ public static String getDatabaseName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String databseName = configuration.get(DATABASE_NAME);
+ if (null == databseName) {
+ throw new InvalidConfigurationException("Database name is not set.");
+ }
+ return databseName;
+ }
+
+ public static void setTableName(Configuration configuration, String tableName) {
+ if (null != tableName) {
+ configuration.set(TABLE_NAME, tableName);
+ }
+ }
+
+ public static String getTableName(Configuration configuration)
+ throws InvalidConfigurationException {
+ String tableName = configuration.get(TABLE_NAME);
+ if (tableName == null) {
+ throw new InvalidConfigurationException("Table name is not set");
+ }
+ return tableName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index bcc487e..efe962d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -17,11 +17,8 @@
package org.apache.carbondata.hadoop.api;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -31,21 +28,14 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datamap.DataMapChooser;
-import org.apache.carbondata.core.datamap.DataMapLevel;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datamap.Segment;
import org.apache.carbondata.core.datamap.TableDataMap;
-import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.exception.InvalidConfigurationException;
import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapFactory;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
import org.apache.carbondata.core.metadata.schema.PartitionInfo;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -60,29 +50,15 @@ import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
import org.apache.carbondata.core.scan.filter.SingleTableProvider;
import org.apache.carbondata.core.scan.filter.TableProvider;
import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
-import org.apache.carbondata.core.scan.model.QueryModel;
-import org.apache.carbondata.core.stats.QueryStatistic;
-import org.apache.carbondata.core.stats.QueryStatisticsConstants;
-import org.apache.carbondata.core.stats.QueryStatisticsRecorder;
import org.apache.carbondata.core.statusmanager.FileFormat;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeConverter;
-import org.apache.carbondata.core.util.DataTypeConverterImpl;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.BlockIndex;
import org.apache.carbondata.hadoop.CarbonInputSplit;
-import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
-import org.apache.carbondata.hadoop.CarbonProjection;
-import org.apache.carbondata.hadoop.CarbonRecordReader;
-import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
-import org.apache.carbondata.hadoop.readsupport.impl.DictionaryDecodeReadSupport;
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
-import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
import org.apache.carbondata.hadoop.util.SchemaReader;
import org.apache.commons.logging.Log;
@@ -91,80 +67,38 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.hadoop.mapreduce.security.TokenCache;
/**
- * Input format of CarbonData file.
+ * InputFormat for reading carbondata files with table level metadata support,
+ * such as segment and explicit schema metadata.
*
* @param <T>
*/
-public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
+public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
// comma separated list of input segment numbers
public static final String INPUT_SEGMENT_NUMBERS =
"mapreduce.input.carboninputformat.segmentnumbers";
- private static final String VALIDATE_INPUT_SEGMENT_IDs =
- "mapreduce.input.carboninputformat.validsegments";
// comma separated list of input files
public static final String INPUT_FILES = "mapreduce.input.carboninputformat.files";
private static final String ALTER_PARTITION_ID = "mapreduce.input.carboninputformat.partitionid";
private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
- private static final String FILTER_PREDICATE =
- "mapreduce.input.carboninputformat.filter.predicate";
- private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
- private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
- private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
- private static final String PARTITIONS_TO_PRUNE =
- "mapreduce.input.carboninputformat.partitions.to.prune";
- public static final String UPADTE_T =
- "mapreduce.input.carboninputformat.partitions.to.prune";
-
// a cache for carbon table, it will be used in task side
private CarbonTable carbonTable;
/**
- * Set the `tableInfo` in `configuration`
- */
- public static void setTableInfo(Configuration configuration, TableInfo tableInfo)
- throws IOException {
- if (null != tableInfo) {
- configuration.set(TABLE_INFO, CarbonUtil.encodeToString(tableInfo.serialize()));
- }
- }
-
- /**
- * Get TableInfo object from `configuration`
- */
- private static TableInfo getTableInfo(Configuration configuration) throws IOException {
- String tableInfoStr = configuration.get(TABLE_INFO);
- if (tableInfoStr == null) {
- return null;
- } else {
- TableInfo output = new TableInfo();
- output.readFields(
- new DataInputStream(
- new ByteArrayInputStream(CarbonUtil.decodeStringToBytes(tableInfoStr))));
- return output;
- }
- }
-
- /**
* Get the cached CarbonTable or create it by TableInfo in `configuration`
*/
- private CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
+ protected CarbonTable getOrCreateCarbonTable(Configuration configuration) throws IOException {
if (carbonTable == null) {
// carbon table should be created either from deserialized table info (schema saved in
// hive metastore) or by reading schema in HDFS (schema saved in HDFS)
@@ -183,150 +117,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
}
}
- public static void setTablePath(Configuration configuration, String tablePath) {
- configuration.set(FileInputFormat.INPUT_DIR, tablePath);
- }
-
- public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {
- configuration.set(ALTER_PARTITION_ID, partitionIds.toString());
- }
-
-
- public static void setDataMapJob(Configuration configuration, DataMapJob dataMapJob)
- throws IOException {
- if (dataMapJob != null) {
- String toString = ObjectSerializationUtil.convertObjectToString(dataMapJob);
- configuration.set(DATA_MAP_DSTR, toString);
- }
- }
-
- private static DataMapJob getDataMapJob(Configuration configuration) throws IOException {
- String jobString = configuration.get(DATA_MAP_DSTR);
- if (jobString != null) {
- return (DataMapJob) ObjectSerializationUtil.convertStringToObject(jobString);
- }
- return null;
- }
-
- /**
- * It sets unresolved filter expression.
- *
- * @param configuration
- * @param filterExpression
- */
- public static void setFilterPredicates(Configuration configuration, Expression filterExpression) {
- if (filterExpression == null) {
- return;
- }
- try {
- String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression);
- configuration.set(FILTER_PREDICATE, filterString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting filter expression to Job", e);
- }
- }
-
- public static void setColumnProjection(Configuration configuration, CarbonProjection projection) {
- if (projection == null || projection.isEmpty()) {
- return;
- }
- String[] allColumns = projection.getAllColumns();
- StringBuilder builder = new StringBuilder();
- for (String column : allColumns) {
- builder.append(column).append(",");
- }
- String columnString = builder.toString();
- columnString = columnString.substring(0, columnString.length() - 1);
- configuration.set(COLUMN_PROJECTION, columnString);
- }
-
- public static String getColumnProjection(Configuration configuration) {
- return configuration.get(COLUMN_PROJECTION);
- }
-
- public static void setCarbonReadSupport(Configuration configuration,
- Class<? extends CarbonReadSupport> readSupportClass) {
- if (readSupportClass != null) {
- configuration.set(CARBON_READ_SUPPORT, readSupportClass.getName());
- }
- }
-
- /**
- * Set list of segments to access
- */
- public static void setSegmentsToAccess(Configuration configuration, List<Segment> validSegments) {
- configuration.set(INPUT_SEGMENT_NUMBERS, CarbonUtil.convertToString(validSegments));
- }
-
- /**
- * Set `CARBON_INPUT_SEGMENTS` from property to configuration
- */
- public static void setQuerySegment(Configuration conf, AbsoluteTableIdentifier identifier) {
- String dbName = identifier.getCarbonTableIdentifier().getDatabaseName().toLowerCase();
- String tbName = identifier.getCarbonTableIdentifier().getTableName().toLowerCase();
- String segmentNumbersFromProperty = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS + dbName + "." + tbName, "*");
- if (!segmentNumbersFromProperty.trim().equals("*")) {
- CarbonTableInputFormat
- .setSegmentsToAccess(conf, Segment.toSegmentList(segmentNumbersFromProperty.split(",")));
- }
- }
-
- /**
- * set list of segment to access
- */
- public static void setValidateSegmentsToAccess(Configuration configuration, Boolean validate) {
- configuration.set(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, validate.toString());
- }
-
- /**
- * get list of segment to access
- */
- public static boolean getValidateSegmentsToAccess(Configuration configuration) {
- return configuration.get(CarbonTableInputFormat.VALIDATE_INPUT_SEGMENT_IDs, "true")
- .equalsIgnoreCase("true");
- }
-
- /**
- * set list of partitions to prune
- */
- public static void setPartitionsToPrune(Configuration configuration,
- List<PartitionSpec> partitions) {
- if (partitions == null) {
- return;
- }
- try {
- String partitionString =
- ObjectSerializationUtil.convertObjectToString(new ArrayList<>(partitions));
- configuration.set(PARTITIONS_TO_PRUNE, partitionString);
- } catch (Exception e) {
- throw new RuntimeException("Error while setting patition information to Job" + partitions, e);
- }
- }
-
- /**
- * get list of partitions to prune
- */
- public static List<PartitionSpec> getPartitionsToPrune(Configuration configuration)
- throws IOException {
- String partitionString = configuration.get(PARTITIONS_TO_PRUNE);
- if (partitionString != null) {
- return (List<PartitionSpec>) ObjectSerializationUtil.convertStringToObject(partitionString);
- }
- return null;
- }
-
- public AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
- throws IOException {
- String tablePath = configuration.get(INPUT_DIR, "");
- try {
- return AbsoluteTableIdentifier
- .from(tablePath, getDatabaseName(configuration), getTableName(configuration));
- } catch (InvalidConfigurationException e) {
- throw new IOException(e);
- }
- }
-
/**
* {@inheritDoc}
* Configurations FileInputFormat.INPUT_DIR
@@ -362,8 +152,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
return getSplitsOfStreaming(job, identifier, streamSegments);
}
-
-
List<Segment> filteredSegmentToAccess = getFilteredSegment(job, segments.getValidSegments());
if (filteredSegmentToAccess.size() == 0) {
return new ArrayList<>(0);
@@ -716,195 +504,6 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
return result;
}
- protected Expression getFilterPredicates(Configuration configuration) {
- try {
- String filterExprString = configuration.get(FILTER_PREDICATE);
- if (filterExprString == null) {
- return null;
- }
- Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString);
- return (Expression) filter;
- } catch (IOException e) {
- throw new RuntimeException("Error while reading filter expression", e);
- }
- }
-
- /**
- * get data blocks of given segment
- */
- private List<org.apache.carbondata.hadoop.CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
- AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
- BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
- List<Integer> oldPartitionIdList) throws IOException {
-
- QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
- QueryStatistic statistic = new QueryStatistic();
-
- // get tokens for all the required FileSystem for table path
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
- boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
- CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
- DataMapExprWrapper dataMapExprWrapper =
- DataMapChooser.get().choose(getOrCreateCarbonTable(job.getConfiguration()), resolver);
- DataMapJob dataMapJob = getDataMapJob(job.getConfiguration());
- List<PartitionSpec> partitionsToPrune = getPartitionsToPrune(job.getConfiguration());
- List<ExtendedBlocklet> prunedBlocklets;
- if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
- DistributableDataMapFormat datamapDstr =
- new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper,
- segmentIds, partitionsToPrune,
- BlockletDataMapFactory.class.getName());
- prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
- // Apply expression on the blocklets.
- prunedBlocklets = dataMapExprWrapper.pruneBlocklets(prunedBlocklets);
- } else {
- prunedBlocklets = dataMapExprWrapper.prune(segmentIds, partitionsToPrune);
- }
-
- List<org.apache.carbondata.hadoop.CarbonInputSplit> resultFilterredBlocks = new ArrayList<>();
- int partitionIndex = 0;
- List<Integer> partitionIdList = new ArrayList<>();
- if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
- partitionIdList = partitionInfo.getPartitionIds();
- }
- for (ExtendedBlocklet blocklet : prunedBlocklets) {
- long partitionId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(
- CarbonTablePath.DataFileUtil.getTaskNo(blocklet.getPath()));
-
- // OldPartitionIdList is only used in alter table partition command because it change
- // partition info first and then read data.
- // For other normal query should use newest partitionIdList
- if (partitionInfo != null && partitionInfo.getPartitionType() != PartitionType.NATIVE_HIVE) {
- if (oldPartitionIdList != null) {
- partitionIndex = oldPartitionIdList.indexOf((int)partitionId);
- } else {
- partitionIndex = partitionIdList.indexOf((int)partitionId);
- }
- }
- if (partitionIndex != -1) {
- // matchedPartitions variable will be null in two cases as follows
- // 1. the table is not a partition table
- // 2. the table is a partition table, and all partitions are matched by query
- // for partition table, the task id of carbaondata file name is the partition id.
- // if this partition is not required, here will skip it.
- if (matchedPartitions == null || matchedPartitions.get(partitionIndex)) {
- CarbonInputSplit inputSplit = convertToCarbonInputSplit(blocklet);
- if (inputSplit != null) {
- resultFilterredBlocks.add(inputSplit);
- }
- }
- }
- }
- statistic
- .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_DRIVER, System.currentTimeMillis());
- recorder.recordStatisticsForDriver(statistic, job.getConfiguration().get("query.id"));
- return resultFilterredBlocks;
- }
-
- private CarbonInputSplit convertToCarbonInputSplit(ExtendedBlocklet blocklet) throws IOException {
- org.apache.carbondata.hadoop.CarbonInputSplit split =
- org.apache.carbondata.hadoop.CarbonInputSplit.from(blocklet.getSegmentId(),
- blocklet.getBlockletId(), new FileSplit(new Path(blocklet.getPath()), 0,
- blocklet.getLength(), blocklet.getLocations()),
- ColumnarFormatVersion.valueOf((short) blocklet.getDetailInfo().getVersionNumber()),
- blocklet.getDataMapWriterPath());
- split.setDetailInfo(blocklet.getDetailInfo());
- return split;
- }
-
- @Override
- public RecordReader<Void, T> createRecordReader(InputSplit inputSplit,
- TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- Configuration configuration = taskAttemptContext.getConfiguration();
- QueryModel queryModel = createQueryModel(inputSplit, taskAttemptContext);
- CarbonReadSupport<T> readSupport = getReadSupportClass(configuration);
- return new CarbonRecordReader<T>(queryModel, readSupport);
- }
-
- public QueryModel createQueryModel(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException {
- Configuration configuration = taskAttemptContext.getConfiguration();
- CarbonTable carbonTable = getOrCreateCarbonTable(configuration);
- TableProvider tableProvider = new SingleTableProvider(carbonTable);
-
- // query plan includes projection column
- String projectionString = getColumnProjection(configuration);
- String[] projectionColumnNames = null;
- if (projectionString != null) {
- projectionColumnNames = projectionString.split(",");
- }
- QueryModel queryModel = carbonTable.createQueryWithProjection(
- projectionColumnNames, getDataTypeConverter(configuration));
-
- // set the filter to the query model in order to filter blocklet before scan
- Expression filter = getFilterPredicates(configuration);
- boolean[] isFilterDimensions = new boolean[carbonTable.getDimensionOrdinalMax()];
- // getAllMeasures returns list of visible and invisible columns
- boolean[] isFilterMeasures =
- new boolean[carbonTable.getAllMeasures().size()];
- CarbonInputFormatUtil.processFilterExpression(filter, carbonTable, isFilterDimensions,
- isFilterMeasures);
- queryModel.setIsFilterDimensions(isFilterDimensions);
- queryModel.setIsFilterMeasures(isFilterMeasures);
- FilterResolverIntf filterIntf = CarbonInputFormatUtil
- .resolveFilter(filter, carbonTable.getAbsoluteTableIdentifier(), tableProvider);
- queryModel.setFilterExpressionResolverTree(filterIntf);
-
- // update the file level index store if there are invalid segment
- if (inputSplit instanceof CarbonMultiBlockSplit) {
- CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit;
- List<String> invalidSegments = split.getAllSplits().get(0).getInvalidSegments();
- if (invalidSegments.size() > 0) {
- queryModel.setInvalidSegmentIds(invalidSegments);
- }
- List<UpdateVO> invalidTimestampRangeList =
- split.getAllSplits().get(0).getInvalidTimestampRange();
- if ((null != invalidTimestampRangeList) && (invalidTimestampRangeList.size() > 0)) {
- queryModel.setInvalidBlockForSegmentId(invalidTimestampRangeList);
- }
- }
- return queryModel;
- }
-
- public CarbonReadSupport<T> getReadSupportClass(Configuration configuration) {
- String readSupportClass = configuration.get(CARBON_READ_SUPPORT);
- //By default it uses dictionary decoder read class
- CarbonReadSupport<T> readSupport = null;
- if (readSupportClass != null) {
- try {
- Class<?> myClass = Class.forName(readSupportClass);
- Constructor<?> constructor = myClass.getConstructors()[0];
- Object object = constructor.newInstance();
- if (object instanceof CarbonReadSupport) {
- readSupport = (CarbonReadSupport) object;
- }
- } catch (ClassNotFoundException ex) {
- LOG.error("Class " + readSupportClass + "not found", ex);
- } catch (Exception ex) {
- LOG.error("Error while creating " + readSupportClass, ex);
- }
- } else {
- readSupport = new DictionaryDecodeReadSupport<>();
- }
- return readSupport;
- }
-
- @Override
- protected boolean isSplitable(JobContext context, Path filename) {
- try {
- // Don't split the file if it is local file system
- FileSystem fileSystem = filename.getFileSystem(context.getConfiguration());
- if (fileSystem instanceof LocalFileSystem) {
- return false;
- }
- } catch (Exception e) {
- return true;
- }
- return true;
- }
-
/**
* return valid segment to access
*/
@@ -969,58 +568,4 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
return new BlockMappingVO(blockRowCountMapping, segmentAndBlockCountMapping);
}
-
- /**
- * It is optional, if user does not set then it reads from store
- *
- * @param configuration
- * @param converter is the Data type converter for different computing engine
- * @throws IOException
- */
- public static void setDataTypeConverter(Configuration configuration, DataTypeConverter converter)
- throws IOException {
- if (null != converter) {
- configuration.set(CARBON_CONVERTER,
- ObjectSerializationUtil.convertObjectToString(converter));
- }
- }
-
- public static DataTypeConverter getDataTypeConverter(Configuration configuration)
- throws IOException {
- String converter = configuration.get(CARBON_CONVERTER);
- if (converter == null) {
- return new DataTypeConverterImpl();
- }
- return (DataTypeConverter) ObjectSerializationUtil.convertStringToObject(converter);
- }
-
- public static void setDatabaseName(Configuration configuration, String databaseName) {
- if (null != databaseName) {
- configuration.set(DATABASE_NAME, databaseName);
- }
- }
-
- public static String getDatabaseName(Configuration configuration)
- throws InvalidConfigurationException {
- String databseName = configuration.get(DATABASE_NAME);
- if (null == databseName) {
- throw new InvalidConfigurationException("Database name is not set.");
- }
- return databseName;
- }
-
- public static void setTableName(Configuration configuration, String tableName) {
- if (null != tableName) {
- configuration.set(TABLE_NAME, tableName);
- }
- }
-
- public static String getTableName(Configuration configuration)
- throws InvalidConfigurationException {
- String tableName = configuration.get(TABLE_NAME);
- if (tableName == null) {
- throw new InvalidConfigurationException("Table name is not set");
- }
- return tableName;
- }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index ab7c333..9df59e6 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -87,11 +87,11 @@ public class SchemaReader {
// Convert the ColumnSchema -> TableSchema -> TableInfo.
// Return the TableInfo.
org.apache.carbondata.format.TableInfo tableInfo =
- CarbonUtil.inferSchemaFileExternalTable(identifier.getTablePath(), identifier, false);
+ CarbonUtil.inferSchema(identifier.getTablePath(), identifier, false);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
- TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, identifier.getDatabaseName(),
- identifier.getTableName(), identifier.getTablePath());
+ TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+ tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
+ identifier.getTablePath());
return wrapperTableInfo;
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/99766b8a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 8b1f63f..7841a23 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -105,14 +105,14 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
}
//TO DO, need to remove segment dependency and tableIdentifier Dependency
- test("read carbondata files (sdk Writer Output) using the Carbonfile ") {
+ test("read carbondata files (sdk Writer Output) using the carbonfile ") {
buildTestData(false)
assert(new File(writerPath).exists())
sql("DROP TABLE IF EXISTS sdkOutputTable")
- //new provider Carbonfile
+ //new provider carbonfile
sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
|'$writerPath' """.stripMargin)
sql("Describe formatted sdkOutputTable").show(false)
@@ -152,7 +152,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
//data source file format
sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
|'$writerPath' """.stripMargin)
val exception = intercept[MalformedCarbonCommandException]
@@ -176,7 +176,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
//data source file format
sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
|'$writerPath' """.stripMargin)
//org.apache.spark.SparkException: Index file not present to read the carbondata file
@@ -192,7 +192,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
cleanTestData()
}
-
+ // TODO: Make the sparkCarbonFileFormat to work without index file
test("Read sdk writer output file without Carbondata file should fail") {
buildTestData(false)
deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -202,7 +202,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
val exception = intercept[Exception] {
// data source file format
sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
|'$writerPath' """.stripMargin)
}
assert(exception.getMessage()
@@ -225,7 +225,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
val exception = intercept[Exception] {
//data source file format
sql(
- s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'Carbonfile' LOCATION
+ s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbonfile' LOCATION
|'$writerPath' """.stripMargin)
sql("select * from sdkOutputTable").show(false)