You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/20 15:44:42 UTC

[09/10] carbondata git commit: [CARBONDATA-2224][File Level Reader Support] Refactoring of #2055

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index d284e50..f421d44 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -113,11 +113,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     //data source file format
     if (sqlContext.sparkContext.version.startsWith("2.1")) {
       //data source file format
-      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
     } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
       //data source file format
       sql(
-        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
            |'$filePath' """.stripMargin)
     } else{
       // TO DO
@@ -162,11 +162,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
 
     if (sqlContext.sparkContext.version.startsWith("2.1")) {
       //data source file format
-      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
     } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
       //data source file format
       sql(
-        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
            |'$filePath' """.stripMargin)
     } else{
       // TO DO
@@ -184,6 +184,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
     cleanTestData()
   }
 
+  // TODO: Make the sparkCarbonFileFormat to work without index file
   test("Read sdk writer output file without Carbondata file should fail") {
     buildTestData(false)
     deleteIndexFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
@@ -194,11 +195,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
       //    data source file format
       if (sqlContext.sparkContext.version.startsWith("2.1")) {
         //data source file format
-        sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+        sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
       } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
         //data source file format
         sql(
-          s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+          s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
              |'$filePath' """.stripMargin)
       } else{
         // TO DO
@@ -224,11 +225,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
       //data source file format
       if (sqlContext.sparkContext.version.startsWith("2.1")) {
         //data source file format
-        sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+        sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
       } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
         //data source file format
         sql(
-          s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+          s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
              |'$filePath' """.stripMargin)
       } else{
         // TO DO
@@ -254,11 +255,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
 
     if (sqlContext.sparkContext.version.startsWith("2.1")) {
       //data source file format
-      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
     } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
       //data source file format
       sql(
-        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
            |'$filePath' """.stripMargin)
     } else{
       // TO DO
@@ -303,11 +304,11 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
 
     if (sqlContext.sparkContext.version.startsWith("2.1")) {
       //data source file format
-      sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
     } else if (sqlContext.sparkContext.version.startsWith("2.2")) {
       //data source file format
       sql(
-        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
            |'$filePath' """.stripMargin)
     } else{
       // TO DO

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index 9a46676..de91f2a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -132,11 +132,11 @@ object TestSparkCarbonFileFormatWithSparkSession {
     //data source file format
     if (spark.sparkContext.version.startsWith("2.1")) {
       //data source file format
-      spark.sql(s"""CREATE TABLE sdkOutputTable USING Carbonfile OPTIONS (PATH '$filePath') """)
+      spark.sql(s"""CREATE TABLE sdkOutputTable USING carbonfile OPTIONS (PATH '$filePath') """)
     } else if (spark.sparkContext.version.startsWith("2.2")) {
       //data source file format
       spark.sql(
-        s"""CREATE TABLE sdkOutputTable USING Carbonfile LOCATION
+        s"""CREATE TABLE sdkOutputTable USING carbonfile LOCATION
            |'$filePath' """.stripMargin)
     } else{
       // TO DO

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 4378c15..2ba6e5e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.command.CarbonMergerMapping
 
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -63,9 +63,9 @@ class CarbonIUDMergerRDD[K, V](
     val defaultParallelism = sparkContext.defaultParallelism
     val noOfBlocks = 0
 
-    CarbonTableInputFormat.setSegmentsToAccess(
+    CarbonInputFormat.setSegmentsToAccess(
       job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
-    CarbonTableInputFormat.setTableInfo(
+    CarbonInputFormat.setTableInfo(
       job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index da268c1..7ae2d14 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -263,10 +263,10 @@ class CarbonMergerRDD[K, V](
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
-    CarbonTableInputFormat.setPartitionsToPrune(
+    CarbonInputFormat.setPartitionsToPrune(
       job.getConfiguration,
       carbonMergerMapping.currentPartitions.map(_.asJava).orNull)
-    CarbonTableInputFormat.setTableInfo(job.getConfiguration,
+    CarbonInputFormat.setTableInfo(job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
     // initialise query_id for job

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
index 5647427..84f6659 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanPartitionRDD.scala
@@ -45,7 +45,7 @@ import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResult
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper
 import org.apache.carbondata.core.util.{ByteUtil, DataTypeUtil}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.merger.CarbonCompactionUtil
 import org.apache.carbondata.processing.partition.spliter.CarbonSplitExecutor
@@ -96,7 +96,7 @@ class CarbonScanPartitionRDD(alterPartitionModel: AlterPartitionModel,
     val job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonTableInputFormat(absoluteTableIdentifier,
       partitionIds.toList.asJava, job)
-    CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     job.getConfiguration.set("query.id", queryId)
 
     val splits = format.getSplitsOfOneSegment(job, segmentId,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6afd2c0..a9b8353 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -50,7 +50,7 @@ import org.apache.carbondata.core.stats.{QueryStatistic, QueryStatisticsConstant
 import org.apache.carbondata.core.statusmanager.FileFormat
 import org.apache.carbondata.core.util._
 import org.apache.carbondata.hadoop._
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonTableInputFormat}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.InitInputMetrics
@@ -90,7 +90,7 @@ class CarbonScanRDD(
     val jobConf = new JobConf(conf)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job = Job.getInstance(jobConf)
-    val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelexternal")
+    val fileLevelExternal = tableInfo.getFactTable().getTableProperties().get("_filelevelformat")
     val format = if (fileLevelExternal != null && fileLevelExternal.equalsIgnoreCase("true")) {
       prepareFileInputFormatForDriver(job.getConfiguration)
     } else {
@@ -432,53 +432,53 @@ class CarbonScanRDD(
   }
 
   def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
-    CarbonTableInputFormat.setTableInfo(conf, tableInfo)
-    CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
-    CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+    CarbonInputFormat.setTableInfo(conf, tableInfo)
+    CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
     if (partitionNames != null) {
-      CarbonTableInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+      CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
     }
     createInputFormat(conf)
   }
 
   def prepareFileInputFormatForDriver(conf: Configuration): CarbonFileInputFormat[Object] = {
-    CarbonFileInputFormat.setTableInfo(conf, tableInfo)
-    CarbonFileInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
-    CarbonFileInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
+    CarbonInputFormat.setTableInfo(conf, tableInfo)
+    CarbonInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
     if (partitionNames != null) {
-      CarbonFileInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
+      CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
     }
     createFileInputFormat(conf)
   }
 
-  private def prepareInputFormatForExecutor(conf: Configuration): CarbonTableInputFormat[Object] = {
-    CarbonTableInputFormat.setCarbonReadSupport(conf, readSupport)
+  private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[Object] = {
+    CarbonInputFormat.setCarbonReadSupport(conf, readSupport)
     val tableInfo1 = getTableInfo
-    CarbonTableInputFormat.setTableInfo(conf, tableInfo1)
-    CarbonTableInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
-    CarbonTableInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
-    CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
+    CarbonInputFormat.setTableInfo(conf, tableInfo1)
+    CarbonInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
+    CarbonInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
+    CarbonInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
     createInputFormat(conf)
   }
 
   private def createFileInputFormat(conf: Configuration): CarbonFileInputFormat[Object] = {
     val format = new CarbonFileInputFormat[Object]
-    CarbonFileInputFormat.setTablePath(conf,
+    CarbonInputFormat.setTablePath(conf,
       identifier.appendWithLocalPrefix(identifier.getTablePath))
-    CarbonFileInputFormat.setQuerySegment(conf, identifier)
-    CarbonFileInputFormat.setFilterPredicates(conf, filterExpression)
-    CarbonFileInputFormat.setColumnProjection(conf, columnProjection)
-    CarbonFileInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    CarbonInputFormat.setQuerySegment(conf, identifier)
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonInputFormat.setColumnProjection(conf, columnProjection)
+    CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     if (CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
       CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+      CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
 
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
-      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+      CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
         .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
                      identifier.getCarbonTableIdentifier.getDatabaseName + "." +
                      identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)
@@ -489,22 +489,22 @@ class CarbonScanRDD(
 
   private def createInputFormat(conf: Configuration): CarbonTableInputFormat[Object] = {
     val format = new CarbonTableInputFormat[Object]
-    CarbonTableInputFormat.setTablePath(conf,
+    CarbonInputFormat.setTablePath(conf,
       identifier.appendWithLocalPrefix(identifier.getTablePath))
-    CarbonTableInputFormat.setQuerySegment(conf, identifier)
-    CarbonTableInputFormat.setFilterPredicates(conf, filterExpression)
-    CarbonTableInputFormat.setColumnProjection(conf, columnProjection)
-    CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+    CarbonInputFormat.setQuerySegment(conf, identifier)
+    CarbonInputFormat.setFilterPredicates(conf, filterExpression)
+    CarbonInputFormat.setColumnProjection(conf, columnProjection)
+    CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     if (CarbonProperties.getInstance().getProperty(
       CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
       CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT).toBoolean) {
-      CarbonTableInputFormat.setDataMapJob(conf, new SparkDataMapJob)
+      CarbonInputFormat.setDataMapJob(conf, new SparkDataMapJob)
     }
 
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
-      CarbonTableInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
+      CarbonInputFormat.setValidateSegmentsToAccess(conf, carbonSessionInfo.getSessionParams
           .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
                        identifier.getCarbonTableIdentifier.getDatabaseName + "." +
                        identifier.getCarbonTableIdentifier.getTableName, "true").toBoolean)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 1656efa..48ebdb4 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -36,7 +36,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.CommonUtil
@@ -154,7 +154,7 @@ object PartitionUtils {
     val job = new Job(jobConf)
     val format = CarbonInputFormatUtil
       .createCarbonTableInputFormat(identifier, partitionIds.asJava, job)
-    CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     val splits = format.getSplitsOfOneSegment(job, segmentId,
       oldPartitionIdList.map(_.asInstanceOf[Integer]).asJava, partitionInfo)
     val blockList = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index c286c50..2d19fd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -34,7 +34,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 
 case class CarbonCountStar(
     attributesRaw: Seq[Attribute],
@@ -45,7 +45,7 @@ case class CarbonCountStar(
   override def doExecute(): RDD[InternalRow] = {
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val (job, tableInputFormat) = createCarbonInputFormat(absoluteTableIdentifier)
-    CarbonTableInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier)
+    CarbonInputFormat.setQuerySegment(job.getConfiguration, absoluteTableIdentifier)
 
     // get row count
     val rowCount = CarbonUpdateUtil.getRowCount(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 10d55ef..8eaeab1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -43,7 +43,7 @@ import org.apache.carbondata.core.mutate.data.RowCountDetailsVO
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.processing.exception.MultipleMatchingException
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.spark.DeleteDelataResultImpl
@@ -90,7 +90,7 @@ object DeleteExecution {
     }
 
     val (carbonInputFormat, job) = createCarbonInputFormat(absoluteTableIdentifier)
-    CarbonTableInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, carbonTable.getTableInfo)
     val keyRdd = deleteRdd.map({ row =>
       val tupleId: String = row
         .getString(row.fieldIndex(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index cf22569..74da11a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -150,7 +150,7 @@ case class CarbonDropTableCommand(
 
       // delete table data only if it is not external table
       if (FileFactory.isFileExist(tablePath, fileType) &&
-          !(carbonTable.isExternalTable || carbonTable.isFileLevelExternalTable)) {
+          !(carbonTable.isExternalTable || carbonTable.isFileLevelFormat)) {
         val file = FileFactory.getCarbonFile(tablePath, fileType)
         CarbonUtil.deleteFoldersAndFilesSilent(file)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
index fa54e0d..2daece3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonFileFormat.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.core.scan.model.QueryModel
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader, InputMetricsStats}
-import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, DataMapJob}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, DataMapJob}
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 @InterfaceAudience.User
@@ -105,9 +105,9 @@ class SparkCarbonFileFormat extends FileFormat
     }
   }
 
-  override def shortName(): String = "Carbonfile"
+  override def shortName(): String = "carbonfile"
 
-  override def toString: String = "Carbonfile"
+  override def toString: String = "carbonfile"
 
   override def hashCode(): Int = getClass.hashCode()
 
@@ -179,10 +179,9 @@ class SparkCarbonFileFormat extends FileFormat
       supportBatchValue = supportBatch(sparkSession, dataSchema)
     }
 
-    CarbonFileInputFormat.setTableName(job.getConfiguration, "externaldummy")
-    CarbonFileInputFormat.setDatabaseName(job.getConfiguration, "default")
+    CarbonInputFormat.setTableName(job.getConfiguration, "externaldummy")
+    CarbonInputFormat.setDatabaseName(job.getConfiguration, "default")
     CarbonMetadata.getInstance.removeTable("default_externaldummy")
-    val dataMapJob: DataMapJob = CarbonFileInputFormat.getDataMapJob(job.getConfiguration)
     val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
 
     (file: PartitionedFile) => {
@@ -207,9 +206,9 @@ class SparkCarbonFileFormat extends FileFormat
         conf1.set("mapreduce.input.carboninputformat.tableName", "externaldummy")
         conf1.set("mapreduce.input.carboninputformat.databaseName", "default")
         conf1.set("mapreduce.input.fileinputformat.inputdir", tablePath)
-        CarbonFileInputFormat.setColumnProjection(conf1, carbonProjection)
+        CarbonInputFormat.setColumnProjection(conf1, carbonProjection)
         filter match {
-          case Some(c) => CarbonFileInputFormat.setFilterPredicates(conf1, c)
+          case Some(c) => CarbonInputFormat.setFilterPredicates(conf1, c)
           case None => None
         }
         val attemptContext = new TaskAttemptContextImpl(conf1, attemptId)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index d85ef68..b20349c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -112,7 +112,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getCarbonTable(alterTableChangeDataTypeModel.databaseName,
             alterTableChangeDataTypeModel.tableName)(sparkSession)
-          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+          if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
           } else {
@@ -128,7 +128,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getCarbonTable(alterTableAddColumnsModel.databaseName,
             alterTableAddColumnsModel.tableName)(sparkSession)
-          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+          if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
           } else {
@@ -144,7 +144,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getCarbonTable(alterTableDropColumnModel.databaseName,
             alterTableDropColumnModel.tableName)(sparkSession)
-          if (carbonTable != null && carbonTable.isFileLevelExternalTable) {
+          if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
           } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 69fd366..55eb5ac 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -265,7 +265,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
       val table = try {
         val schemaPath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath)
         if (!FileFactory.isFileExist(schemaPath, FileFactory.getFileType(schemaPath)) &&
-            provider.equalsIgnoreCase("'Carbonfile'")) {
+            provider.equalsIgnoreCase("'carbonfile'")) {
           SchemaReader.inferSchema(identifier)
         }
         else {
@@ -277,12 +277,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
           operationNotAllowed(s"Invalid table path provided: ${tablePath.get} ", tableHeader)
       }
       // set "_external" property, so that DROP TABLE will not delete the data
-      if (provider.equalsIgnoreCase("'Carbonfile'")) {
-        table.getFactTable.getTableProperties.put("_filelevelexternal", "true")
+      if (provider.equalsIgnoreCase("'carbonfile'")) {
+        table.getFactTable.getTableProperties.put("_filelevelformat", "true")
         table.getFactTable.getTableProperties.put("_external", "false")
       } else {
         table.getFactTable.getTableProperties.put("_external", "true")
-        table.getFactTable.getTableProperties.put("_filelevelexternal", "false")
+        table.getFactTable.getTableProperties.put("_filelevelformat", "false")
       }
       table
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 1f5808a..e8ae67b 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -326,7 +326,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("'Carbonfile'") ||
+        fileStorage.equalsIgnoreCase("'carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
       val createTableTuple = (ctx.createTableHeader, ctx.skewSpec, ctx.bucketSpec,
         ctx.partitionColumns, ctx.columns, ctx.tablePropertyList, ctx.locationSpec(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index c28e4ba..3cb9bd6 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -325,7 +325,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser, sparkSes
     val fileStorage = helper.getFileStorage(ctx.createFileFormat)
 
     if (fileStorage.equalsIgnoreCase("'carbondata'") ||
-        fileStorage.equalsIgnoreCase("'Carbonfile'") ||
+        fileStorage.equalsIgnoreCase("'carbonfile'") ||
         fileStorage.equalsIgnoreCase("'org.apache.carbondata.format'")) {
       val createTableTuple = (ctx.createTableHeader, ctx.skewSpec,
         ctx.bucketSpec, ctx.partitionColumns, ctx.columns, ctx.tablePropertyList,ctx.locationSpec(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b384b6e1/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 47ef5f2..8ab658d 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -145,16 +145,16 @@ class StreamHandoffRDD[K, V](
     val inputSplit = split.asInstanceOf[HandoffPartition].split.value
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val hadoopConf = new Configuration()
-    CarbonTableInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
-    CarbonTableInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
-    CarbonTableInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
+    CarbonInputFormat.setDatabaseName(hadoopConf, carbonTable.getDatabaseName)
+    CarbonInputFormat.setTableName(hadoopConf, carbonTable.getTableName)
+    CarbonInputFormat.setTablePath(hadoopConf, carbonTable.getTablePath)
     val projection = new CarbonProjection
     val dataFields = carbonTable.getStreamStorageOrderColumn(carbonTable.getTableName)
     (0 until dataFields.size()).foreach { index =>
       projection.addColumn(dataFields.get(index).getColName)
     }
-    CarbonTableInputFormat.setColumnProjection(hadoopConf, projection)
-    CarbonTableInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
+    CarbonInputFormat.setColumnProjection(hadoopConf, projection)
+    CarbonInputFormat.setTableInfo(hadoopConf, carbonTable.getTableInfo)
     val attemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
     val format = new CarbonTableInputFormat[Array[Object]]()
     val model = format.createQueryModel(inputSplit, attemptContext)