You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/30 05:12:18 UTC
[10/13] incubator-carbondata git commit: Removed kettle related code
and refactored
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
index 885a0f0..851f7e9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
@@ -69,18 +69,6 @@ class BadRecordLoggerSharedDictionaryTest extends QueryTest with BeforeAndAfterA
sql(
s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',',
|'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL',
- |'FILEHEADER'= 'ID,CUST_ID,cust_name', 'USE_KETTLE' = 'TRUE')""".stripMargin)
- } catch {
- case e: Throwable =>
- assert(e.getMessage.contains("Data load failed due to bad record"))
- }
- }
-
- test("dataload with no kettle") {
- try {
- sql(
- s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',',
- |'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL',
|'FILEHEADER'= 'ID,CUST_ID,cust_name')""".stripMargin)
} catch {
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
index 8a9d8af..1d456d3 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
@@ -41,7 +41,7 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
sql(
s"""
|LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_two_pass
- |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='false')
+ |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
""".stripMargin)
sql(
@@ -54,7 +54,7 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
sql(
s"""
|LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass
- |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true')
+ |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
""".stripMargin)
}
@@ -75,7 +75,7 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
sql(
s"""
|LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass_2
- |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true', 'COLUMNDICT'=
+ |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true', 'COLUMNDICT'=
|'country:$resourcesPath/columndictionary/country.csv, name:$resourcesPath/columndictionary/name.csv')
""".stripMargin)
@@ -89,12 +89,12 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
sql(
s"""
|LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_two_pass
- |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='false')
+ |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
""".stripMargin)
sql(
s"""
|LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_one_pass
- |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true')
+ |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
""".stripMargin)
checkAnswer(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 44bce5e..f827792 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -156,7 +156,6 @@
</environmentVariables>
<systemProperties>
<java.awt.headless>true</java.awt.headless>
- <use_kettle>${use.kettle}</use_kettle>
</systemProperties>
</configuration>
<executions>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2fc918d..7cb5ed4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvreaderstep.{BlockDetails, RddInpututilsForUpdate}
+import org.apache.carbondata.processing.csvload.BlockDetails
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
@@ -70,7 +70,6 @@ object CarbonDataRDDFactory {
alterTableModel: AlterTableModel,
carbonLoadModel: CarbonLoadModel,
storePath: String,
- kettleHomePath: String,
storeLocation: String): Unit = {
var compactionSize: Long = 0
var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
@@ -136,7 +135,6 @@ object CarbonDataRDDFactory {
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
storePath,
- kettleHomePath,
storeLocation,
compactionType,
carbonTable,
@@ -156,7 +154,6 @@ object CarbonDataRDDFactory {
startCompactionThreads(sqlContext,
carbonLoadModel,
storePath,
- kettleHomePath,
storeLocation,
compactionModel,
lock
@@ -179,7 +176,6 @@ object CarbonDataRDDFactory {
def handleCompactionForSystemLocking(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
- kettleHomePath: String,
storeLocation: String,
compactionType: CompactionType,
carbonTable: CarbonTable,
@@ -195,7 +191,6 @@ object CarbonDataRDDFactory {
startCompactionThreads(sqlContext,
carbonLoadModel,
storePath,
- kettleHomePath,
storeLocation,
compactionModel,
lock
@@ -232,7 +227,6 @@ object CarbonDataRDDFactory {
def startCompactionThreads(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
- kettleHomePath: String,
storeLocation: String,
compactionModel: CompactionModel,
compactionLock: ICarbonLock): Unit = {
@@ -264,7 +258,7 @@ object CarbonDataRDDFactory {
DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
storePath: String,
compactionModel: CompactionModel,
- executor, sqlContext, kettleHomePath, storeLocation
+ executor, sqlContext, storeLocation
)
triggeredCompactionStatus = true
} catch {
@@ -313,7 +307,7 @@ object CarbonDataRDDFactory {
DataManagementFunc.executeCompaction(newCarbonLoadModel,
newCarbonLoadModel.getStorePath,
newcompactionModel,
- executor, sqlContext, kettleHomePath, storeLocation
+ executor, sqlContext, storeLocation
)
} catch {
case e: Exception =>
@@ -361,10 +355,8 @@ object CarbonDataRDDFactory {
def loadCarbonData(sqlContext: SQLContext,
carbonLoadModel: CarbonLoadModel,
storePath: String,
- kettleHomePath: String,
columnar: Boolean,
partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
- useKettle: Boolean,
result: Future[DictionaryServer],
dataFrame: Option[DataFrame] = None,
updateModel: Option[UpdateTableModel] = None): Unit = {
@@ -406,7 +398,6 @@ object CarbonDataRDDFactory {
handleCompactionForSystemLocking(sqlContext,
carbonLoadModel,
storePath,
- kettleHomePath,
storeLocation,
CompactionType.MINOR_COMPACTION,
carbonTable,
@@ -424,7 +415,6 @@ object CarbonDataRDDFactory {
startCompactionThreads(sqlContext,
carbonLoadModel,
storePath,
- kettleHomePath,
storeLocation,
compactionModel,
lock
@@ -454,10 +444,6 @@ object CarbonDataRDDFactory {
try {
LOGGER.audit(s"Data load request has been received for table" +
s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- if (!useKettle) {
- LOGGER.audit("Data is loading with New Data Flow for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- }
// Check if any load need to be deleted before loading new data
DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
carbonLoadModel.getTableName, storePath, isForceDeletion = false)
@@ -565,8 +551,7 @@ object CarbonDataRDDFactory {
* 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
* 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
* for locally writing carbondata files(one file one block) in nodes
- * 4)use kettle: use DataFileLoaderRDD to load data and write to carbondata files
- * non kettle: use NewCarbonDataLoadRDD to load data and write to carbondata files
+ * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
*/
val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
// FileUtils will skip file which is no csv, and return all file path which split by ','
@@ -631,27 +616,12 @@ object CarbonDataRDDFactory {
).toArray
}
- if (useKettle) {
- status = new DataFileLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storePath,
- kettleHomePath,
- columnar,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- blocksGroupBy,
- isTableSplitPartition
- ).collect()
- } else {
- status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- currentLoadCount,
- blocksGroupBy,
- isTableSplitPartition).collect()
- }
+ status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ blocksGroupBy,
+ isTableSplitPartition).collect()
}
def loadDataFrame(): Unit = {
@@ -665,26 +635,13 @@ object CarbonDataRDDFactory {
sqlContext.sparkContext)
val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
- if (useKettle) {
- status = new DataFrameLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- storePath,
- kettleHomePath,
- columnar,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- newRdd).collect()
- } else {
- status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
- new DataLoadResultImpl(),
- carbonLoadModel,
- currentLoadCount,
- tableCreationTime,
- schemaLastUpdatedTime,
- newRdd).collect()
- }
+ status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
+ new DataLoadResultImpl(),
+ carbonLoadModel,
+ currentLoadCount,
+ tableCreationTime,
+ schemaLastUpdatedTime,
+ newRdd).collect()
} catch {
case ex: Exception =>
@@ -693,103 +650,6 @@ object CarbonDataRDDFactory {
}
}
- def loadDataFrameForUpdate(): Unit = {
- def triggerDataLoadForSegment(key: String,
- iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
- val rddResult = new updateResultImpl()
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
- var partitionID = "0"
- val loadMetadataDetails = new LoadMetadataDetails
- val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
- var uniqueLoadStatusId = ""
- try {
- val segId = key
- val taskNo = CarbonUpdateUtil
- .getLatestTaskIdForSegment(segId,
- CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
- carbonTable.getCarbonTableIdentifier))
- val index = taskNo + 1
- uniqueLoadStatusId = carbonLoadModel.getTableName +
- CarbonCommonConstants.UNDERSCORE +
- (index + "_0")
-
- // convert timestamp
- val timeStampInLong = updateModel.get.updatedTimeStamp + ""
- loadMetadataDetails.setPartitionCount(partitionID)
- loadMetadataDetails.setLoadName(segId)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
- carbonLoadModel.setPartitionId(partitionID)
- carbonLoadModel.setSegmentId(segId)
- carbonLoadModel.setTaskNo(String.valueOf(index))
- carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
-
- // During Block Spill case Increment of File Count and proper adjustment of Block
- // naming is only done when AbstractFactDataWriter.java : initializeWriter get
- // CarbondataFileName as null. For handling Block Spill not setting the
- // CarbondataFileName in case of Update.
- // carbonLoadModel.setCarbondataFileName(newBlockName)
-
- // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index)
- loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
- val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY +
- UUID.randomUUID().toString
-
- try {
- RddInpututilsForUpdate.put(rddIteratorKey,
- new RddIteratorForUpdate(iter, carbonLoadModel))
- carbonLoadModel.setRddIteratorKey(rddIteratorKey)
- CarbonDataLoadForUpdate
- .run(carbonLoadModel, index, storePath, kettleHomePath,
- segId, loadMetadataDetails, executionErrors)
- } finally {
- RddInpututilsForUpdate.remove(rddIteratorKey)
- }
- } catch {
- case e: Exception =>
- LOGGER.info("DataLoad failure")
- LOGGER.error(e)
- throw e
- }
-
- var finished = false
-
- override def hasNext: Boolean = !finished
-
- override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
- finished = true
- rddResult
- .getKey(uniqueLoadStatusId,
- (loadMetadataDetails, executionErrors))
- }
- }
- resultIter
- }
-
- val updateRdd = dataFrame.get.rdd
-
-
- val keyRDD = updateRdd.map(row =>
- // splitting as (key, value) i.e., (segment, updatedRows)
- (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))
- )
- val groupBySegmentRdd = keyRDD.groupByKey()
-
- val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p =>
- DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, p).map(_.host)
- }.distinct.size
- val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
- sqlContext.sparkContext)
- val groupBySegmentAndNodeRdd =
- new UpdateCoalescedRDD[(String, scala.Iterable[Row])](groupBySegmentRdd,
- nodes.distinct.toArray)
-
- res = groupBySegmentAndNodeRdd.map(x =>
- triggerDataLoadForSegment(x._1, x._2.toIterator).toList
- ).collect()
-
- }
-
if (!updateModel.isDefined) {
CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -798,9 +658,7 @@ object CarbonDataRDDFactory {
var errorMessage: String = "DataLoad failure"
var executorMessage: String = ""
try {
- if (updateModel.isDefined) {
- loadDataFrameForUpdate()
- } else if (dataFrame.isDefined) {
+ if (dataFrame.isDefined) {
loadDataFrame()
}
else {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 63cf9de..e6efeaa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -190,7 +190,6 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
| LOAD DATA INPATH '$csvFolder'
| INTO TABLE ${options.dbName}.${options.tableName}
| OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
- | 'USE_KETTLE' = '${options.useKettle}',
| 'SINGLE_PASS' = '${options.singlePass}')
""".stripMargin
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 002b6f8..4bd0564 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -110,8 +110,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
carbonLoadModel.setStorePath(relation.tableMeta.storePath)
- val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
-
var storeLocation = CarbonProperties.getInstance
.getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir")
@@ -123,7 +121,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
alterTableModel,
carbonLoadModel,
relation.tableMeta.storePath,
- kettleHomePath,
storeLocation
)
} catch {
@@ -388,29 +385,6 @@ case class LoadTable(
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
- // TODO It will be removed after kettle is removed.
- val useKettle = options.get("use_kettle") match {
- case Some(value) => value.toBoolean
- case _ =>
- var useKettleLocal = System.getProperty("use_kettle")
- if (useKettleLocal == null && sparkSession.sparkContext.getConf.contains("use_kettle")) {
- useKettleLocal = sparkSession.sparkContext.getConf.get("use_kettle")
- }
- if (useKettleLocal == null) {
- useKettleLocal = CarbonProperties.getInstance().
- getProperty(CarbonCommonConstants.USE_KETTLE,
- CarbonCommonConstants.USE_KETTLE_DEFAULT)
- }
- try {
- useKettleLocal.toBoolean
- } catch {
- case e: Exception => CarbonCommonConstants.USE_KETTLE_DEFAULT.toBoolean
- }
- }
-
- val kettleHomePath =
- if (useKettle) CarbonScalaUtil.getKettleHome(sparkSession.sqlContext) else ""
-
val delimiter = options.getOrElse("delimiter", ",")
val quoteChar = options.getOrElse("quotechar", "\"")
val fileHeader = options.getOrElse("fileheader", "")
@@ -455,11 +429,11 @@ case class LoadTable(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
case "true" =>
- if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {
+ if (StringUtils.isEmpty(allDictionaryPath)) {
true
} else {
LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
- "can not be used together, and USE_KETTLE must be set as false")
+ "can not be used together")
false
}
case "false" =>
@@ -539,10 +513,8 @@ case class LoadTable(
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
- kettleHomePath,
columnar,
partitionStatus,
- useKettle,
result,
dataFrame,
updateModel)
@@ -590,10 +562,8 @@ case class LoadTable(
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
carbonLoadModel,
relation.tableMeta.storePath,
- kettleHomePath,
columnar,
partitionStatus,
- useKettle,
result,
loadDataFrame,
updateModel)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index 15d5597..e94b6ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -38,7 +38,6 @@ object Spark2TestQueryExecutor {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOGGER.info("use TestQueryExecutorImplV2")
CarbonProperties.getInstance()
- .addProperty("carbon.kettle.home", TestQueryExecutor.kettleHome)
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat)
.addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
System.getProperty("java.io.tmpdir"))
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index bcc82ce..324d3a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -78,12 +78,6 @@ object TableLoader {
System.out.println(s"table name: $dbName.$tableName")
val inputPaths = TableAPIUtil.escape(args(2))
- val kettleHome = CarbonProperties.getInstance().getProperty("carbon.kettle.home")
- if (kettleHome == null) {
- CarbonProperties.getInstance().addProperty("carbon.kettle.home",
- map.getOrElse("carbon.kettle.home", ""))
- }
-
val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
CarbonEnv.init(spark)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 33f710b..642b330 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -54,7 +54,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4")
""")
LoadTable(Some("default"), "t4", s"$resourcesPath/source.csv", Nil,
- Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ Map()).run(sqlContext.sparkSession)
val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4")
if (table != null && table.getBucketingInfo("t4") != null) {
assert(true)
@@ -90,7 +90,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
OPTIONS("tableName"="t5")
""")
LoadTable(Some("default"), "t5", s"$resourcesPath/source.csv", Nil,
- Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ Map()).run(sqlContext.sparkSession)
val plan = sql(
"""
@@ -115,7 +115,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6")
""")
LoadTable(Some("default"), "t6", s"$resourcesPath/source.csv", Nil,
- Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ Map()).run(sqlContext.sparkSession)
val plan = sql(
"""
@@ -140,7 +140,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7")
""")
LoadTable(Some("default"), "t7", s"$resourcesPath/source.csv", Nil,
- Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ Map()).run(sqlContext.sparkSession)
sql("DROP TABLE IF EXISTS bucketed_parquet_table")
sql("select * from t7").write
@@ -171,7 +171,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8")
""")
LoadTable(Some("default"), "t8", s"$resourcesPath/source.csv", Nil,
- Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ Map()).run(sqlContext.sparkSession)
sql("DROP TABLE IF EXISTS parquet_table")
sql("select * from t8").write
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
index ceb340e..55eaa20 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
@@ -44,7 +44,7 @@ class VectorReaderTestCase extends QueryTest with BeforeAndAfterAll {
OPTIONS("tableName"="vectorreader")
""")
LoadTable(Some("default"), "vectorreader", s"$resourcesPath/source.csv", Nil,
- Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+ Map()).run(sqlContext.sparkSession)
}
test("test vector reader") {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 95a3ff4..a132d6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,8 +105,6 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<snappy.version>1.1.2.6</snappy.version>
<hadoop.version>2.2.0</hadoop.version>
- <kettle.version>4.4.0-stable</kettle.version>
- <use.kettle>false</use.kettle>
<hadoop.deps.scope>compile</hadoop.deps.scope>
<spark.deps.scope>compile</spark.deps.scope>
<scala.deps.scope>compile</scala.deps.scope>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/kettle.properties
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/kettle.properties b/processing/carbonplugins/.kettle/kettle.properties
deleted file mode 100644
index 9bae2e3..0000000
--- a/processing/carbonplugins/.kettle/kettle.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-# This file was generated by Pentaho Data Integration version 4.2.1.
-#
-# Here are a few examples of variables to set:
-#
-# PRODUCTION_SERVER = hercules
-# TEST_SERVER = zeus
-# DEVELOPMENT_SERVER = thor
-#
-# Note: lines like these with a # in front of it are comments
-#
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml
deleted file mode 100644
index 2ae5a87..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonAggSurrogateGenerator"
- description="Carbon Agg Surrogate Generator"
- tooltip="Carbon Agg Surrogate Generator"
- category="Carbon"
- classname="org.apache.carbondata.processing.aggregatesurrogategenerator.step.CarbonAggregateSurrogateGeneratorMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml
deleted file mode 100644
index fc3cdf5..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonAutoAggGraphGenerator"
- description="Carbon Auto Agg Graph Generator"
- tooltip="Carbon Auto Agg Graph Generator"
- category="Carbon"
- classname="org.apache.carbondata.processing.autoaggregategraphgenerator.step.CarbonAutoAGGGraphGeneratorMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml
deleted file mode 100644
index a0a2192..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonAutoAggSliceMerger"
- description="Carbon Auto Agg Slice Merger"
- tooltip="Carbon Auto Agg Slice Merger"
- category="Carbon"
- classname="org.apache.carbondata.processing.merger.step.autoaggregate.CarbonAutoAggregateSliceMergerMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml
deleted file mode 100644
index 91617c5..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
- id="CarbonCSVBasedSurrogateGen"
- description="Carbon CSV Based Surrogate Key Generator"
- tooltip="Carbon CSV based Surrogate Key Generator"
- category="Carbon"
- classname="org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedSeqGenMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml
deleted file mode 100644
index 7f33b5e..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
- id="CSVReader"
- description="Carbon CSVReader"
- tooltip="Carbon CSVReader"
- category="Carbon"
- classname="org.apache.carbondata.processing.csvreader.CsvReaderMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml
deleted file mode 100644
index 7173171..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
- id="CarbonCSVReaderStep"
- description="Carbon CSV Reader Step"
- tooltip="Carbon CSV Reader Step"
- category="Carbon"
- classname="org.apache.carbondata.processing.csvreaderstep.CsvInputMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml
deleted file mode 100644
index 92631a1..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonDataWriter"
- description="Carbon Data Writer"
- tooltip="Carbon Data Writer"
- category="Carbon"
- classname="org.apache.carbondata.processing.store.CarbonDataWriterStepMeta">
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml
deleted file mode 100644
index 270bbe3..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonFactReader"
- description="Carbon Fact Reader"
- tooltip="Carbon Fact Reader"
- category="Carbon"
- classname="org.apache.carbondata.processing.factreader.step.CarbonFactReaderMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml
deleted file mode 100644
index 087589e..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonGroupBy"
- description="Carbon Group By"
- tooltip="Carbon Group By"
- category="Carbon"
- classname="org.apache.carbondata.processing.groupby.step.CarbonGroupByStepMeta">
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml
deleted file mode 100644
index 550531c..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonInMemoryFactReader"
- description="Carbon InMemory Fact Reader"
- tooltip="Carbon InMemory Fact Reader"
- category="Carbon"
- classname="org.apache.carbondata.processing.factreader.inmemory.step.CarbonInMemoryFactReaderMeta">
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml
deleted file mode 100644
index 517b757..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
- id="CarbonSurrogateGen"
- description="Carbon Surrogate Key Generator"
- tooltip="Carbon Surrogate Key Generator"
- category="Carbon"
- classname="org.apache.carbondata.processing.surrogatekeysgenerator.dbbased.CarbonSeqGenStepMeta">
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml
deleted file mode 100644
index 618dba0..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
- id="CarbonSliceMerger"
- description="Carbon Slice Merger"
- tooltip="Carbon Slice Merger"
- category="Carbon"
- classname="org.apache.carbondata.processing.merger.step.CarbonSliceMergerStepMeta">
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml
deleted file mode 100644
index 970a855..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="CarbonSortKeyGroupBy"
- description="Carbon Sort And Group By"
- tooltip="Carbon Sort And Group By"
- category="Carbon"
- classname="org.apache.carbondata.processing.sortandgroupby.step.CarbonSortKeyAndGroupByStepMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml
deleted file mode 100644
index e0892a4..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="MDKeyGen"
- description="MD Key Gen"
- tooltip="MD Key Gen"
- category="Carbon"
- classname="org.apache.carbondata.processing.mdkeygen.MDKeyGenStepMeta">
-
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml
deleted file mode 100644
index d08d0ee..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied. See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
- id="SortKey"
- description="Carbon Sort"
- tooltip="Carbon Sort"
- category="Carbon"
- classname="org.apache.carbondata.processing.sortandgroupby.sortdatastep.SortKeyStepMeta">
-</plugin>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/pom.xml
----------------------------------------------------------------------
diff --git a/processing/pom.xml b/processing/pom.xml
index 096f49a..57b0908 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -49,37 +49,11 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
- <dependency>
- <groupId>pentaho-kettle</groupId>
- <artifactId>kettle-engine</artifactId>
- <version>${kettle.version}</version>
- </dependency>
<dependency>
<groupId>com.univocity</groupId>
<artifactId>univocity-parsers</artifactId>
<version>1.5.6</version>
</dependency>
- <dependency>
- <groupId>pentaho-kettle</groupId>
- <artifactId>kettle-core</artifactId>
- <version>${kettle.version}</version>
- </dependency>
- <dependency>
- <groupId>pentaho-kettle</groupId>
- <artifactId>kettle-db</artifactId>
- <version>${kettle.version}</version>
- </dependency>
- <dependency>
- <groupId>commons-vfs</groupId>
- <artifactId>commons-vfs</artifactId>
- <version>1.0</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
new file mode 100644
index 0000000..d6d214b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.csvload;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * blocks info
+ */
+public class BlockDetails extends FileSplit implements Serializable {
+
+ /**
+ * serialization version
+ */
+ private static final long serialVersionUID = 2293906691860002339L;
+ //block offset
+ private long blockOffset;
+ //block length
+ private long blockLength;
+ //file path which block belong to
+ private String filePath;
+ // locations where this block exists
+ private String[] locations;
+
+ public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
+ super(filePath, blockOffset, blockLength, locations);
+ this.filePath = filePath.toString();
+ this.blockOffset = blockOffset;
+ this.blockLength = blockLength;
+ this.locations = locations;
+ }
+
+ public long getBlockOffset() {
+ return blockOffset;
+ }
+
+ public long getBlockLength() {
+ return blockLength;
+ }
+
+ public String getFilePath() {
+ return FileFactory.getUpdatedFilePath(filePath);
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ }
+
+ public String[] getLocations() {
+ return locations;
+ }
+
+ /** The file containing this split's data. */
+ @Override
+ public Path getPath() { return new Path(filePath); }
+
+ /** The position of the first byte in the file to process. */
+ @Override
+ public long getStart() { return blockOffset; }
+
+ /** The number of bytes in the file to process. */
+ @Override
+ public long getLength() { return blockLength; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
deleted file mode 100644
index 7e6f6f4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.csvload;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.constants.DataProcessorConstants;
-import org.apache.carbondata.processing.csvreaderstep.CsvInputMeta;
-import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
-import org.apache.carbondata.processing.etl.DataLoadingException;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.pentaho.di.core.KettleEnvironment;
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.logging.LogLevel;
-import org.pentaho.di.core.logging.LoggingObjectInterface;
-import org.pentaho.di.core.logging.LoggingRegistry;
-import org.pentaho.di.core.xml.XMLHandlerCache;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.steps.getfilenames.GetFileNamesMeta;
-import org.pentaho.di.trans.steps.hadoopfileinput.HadoopFileInputMeta;
-import org.pentaho.di.trans.steps.textfileinput.TextFileInputField;
-
-public class DataGraphExecuter {
- /**
- * Comment for <code>LOGGER</code>
- */
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DataGraphExecuter.class.getName());
- /**
- * graph transformation object
- */
- private Trans trans;
- /**
- *
- */
- private IDataProcessStatus model;
-
- public DataGraphExecuter(IDataProcessStatus model) {
- this.model = model;
- }
-
- /**
- * This Method checks whether csv file provided and the column name in schema are same
- * or not
- *
- * @param columnNames
- * @param csvFilePath
- * @return true if same, false otherwise.
- */
- private boolean checkCSVAndRequestedTableColumns(String[] columnNames, String csvFilePath,
- String delimiter) throws IOException {
- return GraphExecutionUtil.checkCSVAndRequestedTableColumns(csvFilePath, columnNames, delimiter);
- }
-
- /**
- * This method returns the Columns names from the schema.
- *
- * @param tableName
- * @return column names array.
- */
- private String[] getColumnNames(String tableName, CarbonDataLoadSchema schema) {
- Set<String> columnNames = CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName);
- return columnNames.toArray(new String[columnNames.size()]);
- }
-
- private void validateCSV(String tableName, CarbonFile f, CarbonDataLoadSchema schema,
- String delimiter) throws DataLoadingException, IOException {
-
- String[] columnNames = getColumnNames(tableName, schema);
-
- if (!checkCSVAndRequestedTableColumns(columnNames, f.getAbsolutePath(), delimiter)) {
- LOGGER.error(
- "CSV File provided is not proper. Column names in schema and csv header are not same. "
- + "CSVFile Name : "
- + f.getName());
- throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
- "CSV File provided is not proper. Column names in schema and csv header are not same. "
- + "CSVFile Name : "
- + f.getName());
- }
- }
-
- public void executeGraph(String graphFilePath, SchemaInfo schemaInfo, CarbonDataLoadSchema schema)
- throws DataLoadingException {
-
- //This Method will validate the both fact and dimension csv files.
- if (!schemaInfo.isAutoAggregateRequest() && model.getRddIteratorKey() == null) {
- validateCSVFiles(schema);
- }
- execute(graphFilePath, schemaInfo);
- }
-
- /**
- * executeGraph which generate the kettle graph
- *
- * @throws DataLoadingException
- */
-
- private void execute(String graphFilePath, SchemaInfo schemaInfo)
- throws DataLoadingException {
-
- //This Method will validate the both fact and dimension csv files.
-
- initKettleEnv();
- TransMeta transMeta = null;
- try {
- transMeta = new TransMeta(graphFilePath);
- transMeta.setFilename(graphFilePath);
- trans = new Trans(transMeta);
- if (!schemaInfo.isAutoAggregateRequest()) {
- // Register HDFS as a file system type with VFS to make HadoopFileInputMeta work
- boolean hdfsReadMode =
- model.getCsvFilePath() != null && model.getCsvFilePath().startsWith("hdfs:");
- trans.setVariable("modifiedDimNames", model.getDimTables());
- trans.setVariable("csvInputFilePath", model.getCsvFilePath());
- trans.setVariable(CarbonCommonConstants.BAD_RECORD_KEY, null);
- if (hdfsReadMode) {
- trans.addParameterDefinition("vfs.hdfs.dfs.client.read.shortcircuit", "true", "");
- trans.addParameterDefinition("vfs.hdfs.dfs.domain.socket.path",
- "/var/lib/hadoop-hdfs-new/dn_socket", "");
- trans.addParameterDefinition("vfs.hdfs.dfs.block.local-path-access.user", "hadoop,root",
- "");
- trans.addParameterDefinition("vfs.hdfs.io.file.buffer.size", "5048576", "");
- }
- List<StepMeta> stepsMeta = trans.getTransMeta().getSteps();
- StringBuilder builder = new StringBuilder();
- StringBuilder measuresInCSVFile = new StringBuilder();
- processCsvInputMeta(stepsMeta, builder, measuresInCSVFile);
- processGetFileNamesMeta(stepsMeta);
-
- processHadoopFileInputMeta(stepsMeta, builder, measuresInCSVFile);
- }
- setGraphLogLevel();
- trans.execute(null);
- LOGGER.info("Graph execution is started " + graphFilePath);
- trans.waitUntilFinished();
- LOGGER.info("Graph execution is finished.");
- } catch (KettleException | IOException e) {
- LOGGER.error(e, "Unable to start execution of graph " + e.getMessage());
- throw new DataLoadingException("Unable to start execution of graph ", e);
- }
-
- //Don't change the logic of creating key
- String key = model.getDatabaseName() + '/' + model.getTableName() + '_' + model.getTableName();
-
- if (trans.getErrors() > 0) {
- if (null != trans.getVariable(CarbonCommonConstants.BAD_RECORD_KEY)) {
- LOGGER.error(trans.getVariable(CarbonCommonConstants.BAD_RECORD_KEY));
- throw new DataLoadingException(
- "Data load failed due to bad record ," + trans
- .getVariable(CarbonCommonConstants.BAD_RECORD_KEY));
- }
- LOGGER.error("Graph Execution had errors");
- throw new DataLoadingException("Due to internal errors, please check logs for more details.");
- } else if (null != BadRecordsLogger.hasBadRecord(key)) {
- LOGGER.error("Data load is partially success");
- throw new DataLoadingException(DataProcessorConstants.BAD_REC_FOUND,
- "Data load is partially success");
- } else {
- LOGGER.info("Graph execution task is over with No error.");
- }
- LoggingRegistry instance = LoggingRegistry.getInstance();
- Map<String, LoggingObjectInterface> map = instance.getMap();
- if (null != map) {
- for (Entry<String, LoggingObjectInterface> entry : map.entrySet()) {
- instance.removeIncludingChildren(entry.getKey());
- }
- }
-
- map = null;
- XMLHandlerCache.getInstance().clear();
- trans.cleanup();
- trans.eraseParameters();
- trans.killAll();
- trans = null;
- }
-
- /**
- * @param stepsMeta
- * @param builder
- * @param measuresInCSVFile
- * @throws DataLoadingException
- */
- private void processHadoopFileInputMeta(List<StepMeta> stepsMeta, StringBuilder builder,
- StringBuilder measuresInCSVFile) throws DataLoadingException {
- for (StepMeta step : stepsMeta) {
- if (step.getStepMetaInterface() instanceof HadoopFileInputMeta) {
-
- HadoopFileInputMeta stepMetaInterface = (HadoopFileInputMeta) step.getStepMetaInterface();
- if (null != model.getCsvFilePath()) {
- stepMetaInterface.setFilenameField("filename");
- stepMetaInterface.setFileName(new String[] { "${csvInputFilePath}" });
- stepMetaInterface.setDefault();
- stepMetaInterface.setEncoding(CarbonCommonConstants.DEFAULT_CHARSET);
- stepMetaInterface.setEnclosure("\"");
- stepMetaInterface.setHeader(true);
- stepMetaInterface.setSeparator(",");
- stepMetaInterface.setAcceptingFilenames(true);
- stepMetaInterface.setAcceptingStepName("getFileNames");
- stepMetaInterface.setFileFormat("mixed");
- stepMetaInterface.setAcceptingField("filename");
-
- CarbonFile csvFileToRead = GraphExecutionUtil.getCsvFileToRead(model.getCsvFilePath());
- TextFileInputField[] inputFields = GraphExecutionUtil
- .getTextInputFiles(csvFileToRead, builder, measuresInCSVFile, ",");
- stepMetaInterface.setInputFields(inputFields);
- } else if (model.isDirectLoad()) {
- String[] files = new String[model.getFilesToProcess().size()];
- int i = 0;
- for (String file : model.getFilesToProcess()) {
- files[i++] = file;
- }
- stepMetaInterface.setFileName(files);
- stepMetaInterface.setFilenameField("filename");
- stepMetaInterface.setDefault();
- stepMetaInterface.setEncoding(CarbonCommonConstants.DEFAULT_CHARSET);
- stepMetaInterface.setEnclosure("\"");
- stepMetaInterface.setHeader(true);
- stepMetaInterface.setSeparator(",");
- stepMetaInterface.setAcceptingFilenames(true);
- stepMetaInterface.setAcceptingStepName("getFileNames");
- stepMetaInterface.setFileFormat("mixed");
- stepMetaInterface.setAcceptingField("filename");
-
- if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
- TextFileInputField[] inputParams = GraphExecutionUtil
- .getTextInputFiles(model.getCsvHeader(), builder, measuresInCSVFile, ",");
- ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputParams);
- ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
- ((CsvInputMeta) step.getStepMetaInterface())
- .setEscapeCharacter(model.getEscapeCharacter());
- ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(false);
-
- } else if (model.getFilesToProcess().size() > 0) {
- CarbonFile csvFile =
- GraphExecutionUtil.getCsvFileToRead(model.getFilesToProcess().get(0));
- TextFileInputField[] inputFields = GraphExecutionUtil
- .getTextInputFiles(csvFile, builder, measuresInCSVFile,
- model.getCsvDelimiter());
- ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
- ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
- ((CsvInputMeta) step.getStepMetaInterface())
- .setEscapeCharacter(model.getEscapeCharacter());
- ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(true);
- }
- }
-
- break;
- }
- }
- }
-
- /**
- * @param stepsMeta
- * @throws IOException
- */
- private void processGetFileNamesMeta(List<StepMeta> stepsMeta) throws IOException {
- for (StepMeta step : stepsMeta) {
- if (step.getStepMetaInterface() instanceof GetFileNamesMeta) {
- GetFileNamesMeta stepMetaInterface = (GetFileNamesMeta) step.getStepMetaInterface();
- if (null != model.getCsvFilePath()) {
- boolean checkIsFolder = GraphExecutionUtil.checkIsFolder(model.getCsvFilePath());
- if (checkIsFolder) {
- stepMetaInterface.setFileName(new String[] { model.getCsvFilePath() });
- stepMetaInterface.setFileMask(new String[] { ".*\\.csv$|.*\\.inprogress" });
- stepMetaInterface.setExcludeFileMask(new String[] { "1" });
- } else {
- //If absolute file path is provided for the data load and stopped in between then csv
- // file will be
- // changed to inprogress, and when next time server start then we need to check the
- // file name extension.
- // can contain .csv.inprogress file.
-
- FileType fileType = FileFactory.getFileType(model.getCsvFilePath());
-
- boolean exists = FileFactory.isFileExist(model.getCsvFilePath(), fileType);
-
- if (exists) {
- stepMetaInterface.setFileName(new String[] { model.getCsvFilePath() });
- stepMetaInterface.setExcludeFileMask(new String[] { null });
- } else {
- stepMetaInterface.setFileName(new String[] {
- model.getCsvFilePath() + CarbonCommonConstants.FILE_INPROGRESS_STATUS });
- stepMetaInterface.setExcludeFileMask(new String[] { null });
- }
- }
- } else if (model.isDirectLoad()) {
- String[] files = new String[model.getFilesToProcess().size()];
- int i = 0;
- for (String file : model.getFilesToProcess()) {
- files[i++] = file;
- }
- stepMetaInterface.setFileName(files);
- }
- break;
- }
- }
- }
-
- /**
- * @param stepsMeta
- * @param builder
- * @param measuresInCSVFile
- * @throws DataLoadingException
- */
- private void processCsvInputMeta(List<StepMeta> stepsMeta, StringBuilder builder,
- StringBuilder measuresInCSVFile) throws DataLoadingException {
- for (StepMeta step : stepsMeta) {
- if (step.getStepMetaInterface() instanceof CsvInputMeta) {
- if (null != model.getCsvFilePath() && model.getRddIteratorKey() == null) {
- CarbonFile csvFileToRead = GraphExecutionUtil.getCsvFileToRead(model.getCsvFilePath());
- TextFileInputField[] inputFields = GraphExecutionUtil
- .getTextInputFiles(csvFileToRead, builder, measuresInCSVFile, ",");
- ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
- } else if (model.isDirectLoad()) {
- if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
- TextFileInputField[] inputFields = GraphExecutionUtil
- .getTextInputFiles(model.getCsvHeader(), builder, measuresInCSVFile, ",");
- ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
- ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
- ((CsvInputMeta) step.getStepMetaInterface())
- .setEscapeCharacter(model.getEscapeCharacter());
- ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(false);
-
- } else if (model.getFilesToProcess().size() > 0) {
- CarbonFile csvFileToRead =
- GraphExecutionUtil.getCsvFileToRead(model.getFilesToProcess().get(0));
- TextFileInputField[] inputFields = GraphExecutionUtil
- .getTextInputFiles(csvFileToRead, builder, measuresInCSVFile,
- model.getCsvDelimiter());
- ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
- ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
- ((CsvInputMeta) step.getStepMetaInterface())
- .setEscapeCharacter(model.getEscapeCharacter());
- ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(true);
- }
- }
- break;
- }
- }
- }
-
- /**
- *
- */
- private void initKettleEnv() {
- try {
- KettleEnvironment.init(false);
- LOGGER.info("Kettle environment initialized");
- } catch (KettleException ke) {
- LOGGER.error("Unable to initialize Kettle Environment " + ke.getMessage());
- }
- }
-
-
- private void setGraphLogLevel() {
- trans.setLogLevel(LogLevel.NOTHING);
- }
-
- /**
- * This method will validate the both fact as well as dimension csv files.
- *
- * @param schema
- * @throws DataLoadingException
- */
- private void validateCSVFiles(CarbonDataLoadSchema schema) throws DataLoadingException {
- // Validate the Fact CSV Files.
- String csvFilePath = model.getCsvFilePath();
- if (csvFilePath != null) {
- FileType fileType = FileFactory.getFileType(csvFilePath);
- try {
- boolean exists = FileFactory.isFileExist(csvFilePath, fileType);
- if (exists && FileFactory.getCarbonFile(csvFilePath, fileType).isDirectory()) {
- CarbonFile fileDir = FileFactory.getCarbonFile(csvFilePath, fileType);
- CarbonFile[] listFiles = fileDir.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile pathname) {
- if (pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || pathname
- .getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
- + CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
- return true;
- }
- return false;
- }
- });
-
- for (CarbonFile f : listFiles) {
- validateCSV(model.getTableName(), f, schema, ",");
- }
- } else {
-
- if (!(csvFilePath.endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || csvFilePath
- .endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
- + CarbonCommonConstants.FILE_INPROGRESS_STATUS))) {
- LOGGER.error("File provided is not proper, Only csv files are allowed." + csvFilePath);
- throw new DataLoadingException(
- "File provided is not proper, Only csv files are allowed." + csvFilePath);
- }
-
- if (exists) {
- validateCSV(model.getTableName(),
- FileFactory.getCarbonFile(csvFilePath, fileType), schema, ",");
- } else {
- validateCSV(model.getTableName(), FileFactory
- .getCarbonFile(csvFilePath + CarbonCommonConstants.FILE_INPROGRESS_STATUS,
- fileType), schema, ",");
- }
-
- }
-
- } catch (IOException e) {
- LOGGER.error(e,
- "Error while checking file exists" + csvFilePath);
- }
- } else if (model.isDirectLoad()) {
- if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
- if (!CarbonDataProcessorUtil
- .isHeaderValid(model.getTableName(), model.getCsvHeader(), schema, ",")) {
- LOGGER.error("CSV header provided in DDL is not proper."
- + " Column names in schema and CSV header are not the same.");
- throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
- "CSV header provided in DDL is not proper. Column names in schema and CSV header are "
- + "not the same.");
- }
- } else {
- for (String file : model.getFilesToProcess()) {
- try {
- FileFactory.FileType fileType = FileFactory.getFileType(file);
- if (FileFactory.isFileExist(file, fileType)) {
- validateCSV(model.getTableName(),
- FileFactory.getCarbonFile(file, fileType), schema,
- model.getCsvDelimiter());
- }
- } catch (IOException e) {
- LOGGER.error(e,
- "Error while checking file exists" + file);
- }
- }
- }
- }
- }
-
-}