You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/03/30 05:12:18 UTC

[10/13] incubator-carbondata git commit: Removed kettle related code and refactored

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
index 885a0f0..851f7e9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/badrecordloger/BadRecordLoggerSharedDictionaryTest.scala
@@ -69,18 +69,6 @@ class BadRecordLoggerSharedDictionaryTest extends QueryTest with BeforeAndAfterA
       sql(
         s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',',
             |'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL',
-            |'FILEHEADER'= 'ID,CUST_ID,cust_name', 'USE_KETTLE' = 'TRUE')""".stripMargin)
-    } catch {
-      case e: Throwable =>
-        assert(e.getMessage.contains("Data load failed due to bad record"))
-    }
-  }
-
-  test("dataload with no kettle") {
-    try {
-      sql(
-        s"""LOAD DATA INPATH '$csvFilePath' INTO TABLE testdrive OPTIONS('DELIMITER'=',',
-            |'QUOTECHAR'= '"', 'BAD_RECORDS_LOGGER_ENABLE'='TRUE', 'BAD_RECORDS_ACTION'='FAIL',
             |'FILEHEADER'= 'ID,CUST_ID,cust_name')""".stripMargin)
     } catch {
       case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
index 8a9d8af..1d456d3 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
@@ -41,7 +41,7 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
     sql(
       s"""
         |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_two_pass
-        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='false')
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
       """.stripMargin)
 
     sql(
@@ -54,7 +54,7 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
     sql(
       s"""
         |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass
-        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true')
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
       """.stripMargin)
   }
 
@@ -75,7 +75,7 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
     sql(
       s"""
         |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass_2
-        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true', 'COLUMNDICT'=
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true', 'COLUMNDICT'=
         |'country:$resourcesPath/columndictionary/country.csv, name:$resourcesPath/columndictionary/name.csv')
       """.stripMargin)
 
@@ -89,12 +89,12 @@ class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
     sql(
       s"""
         |LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_two_pass
-        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='false')
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
       """.stripMargin)
     sql(
       s"""
         |LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_one_pass
-        |OPTIONS('DELIMITER'= ',', 'USE_KETTLE'='false', 'SINGLE_PASS'='true')
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
       """.stripMargin)
 
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 44bce5e..f827792 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -156,7 +156,6 @@
           </environmentVariables>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
-            <use_kettle>${use.kettle}</use_kettle>
           </systemProperties>
         </configuration>
         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2fc918d..7cb5ed4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -48,7 +48,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvreaderstep.{BlockDetails, RddInpututilsForUpdate}
+import org.apache.carbondata.processing.csvload.BlockDetails
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
@@ -70,7 +70,6 @@ object CarbonDataRDDFactory {
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
-      kettleHomePath: String,
       storeLocation: String): Unit = {
     var compactionSize: Long = 0
     var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
@@ -136,7 +135,6 @@ object CarbonDataRDDFactory {
       handleCompactionForSystemLocking(sqlContext,
         carbonLoadModel,
         storePath,
-        kettleHomePath,
         storeLocation,
         compactionType,
         carbonTable,
@@ -156,7 +154,6 @@ object CarbonDataRDDFactory {
           startCompactionThreads(sqlContext,
             carbonLoadModel,
             storePath,
-            kettleHomePath,
             storeLocation,
             compactionModel,
             lock
@@ -179,7 +176,6 @@ object CarbonDataRDDFactory {
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
-      kettleHomePath: String,
       storeLocation: String,
       compactionType: CompactionType,
       carbonTable: CarbonTable,
@@ -195,7 +191,6 @@ object CarbonDataRDDFactory {
         startCompactionThreads(sqlContext,
           carbonLoadModel,
           storePath,
-          kettleHomePath,
           storeLocation,
           compactionModel,
           lock
@@ -232,7 +227,6 @@ object CarbonDataRDDFactory {
   def startCompactionThreads(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
-      kettleHomePath: String,
       storeLocation: String,
       compactionModel: CompactionModel,
       compactionLock: ICarbonLock): Unit = {
@@ -264,7 +258,7 @@ object CarbonDataRDDFactory {
             DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
               storePath: String,
               compactionModel: CompactionModel,
-              executor, sqlContext, kettleHomePath, storeLocation
+              executor, sqlContext, storeLocation
             )
             triggeredCompactionStatus = true
           } catch {
@@ -313,7 +307,7 @@ object CarbonDataRDDFactory {
                 DataManagementFunc.executeCompaction(newCarbonLoadModel,
                   newCarbonLoadModel.getStorePath,
                   newcompactionModel,
-                  executor, sqlContext, kettleHomePath, storeLocation
+                  executor, sqlContext, storeLocation
                 )
               } catch {
                 case e: Exception =>
@@ -361,10 +355,8 @@ object CarbonDataRDDFactory {
   def loadCarbonData(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
-      kettleHomePath: String,
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
-      useKettle: Boolean,
       result: Future[DictionaryServer],
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
@@ -406,7 +398,6 @@ object CarbonDataRDDFactory {
           handleCompactionForSystemLocking(sqlContext,
             carbonLoadModel,
             storePath,
-            kettleHomePath,
             storeLocation,
             CompactionType.MINOR_COMPACTION,
             carbonTable,
@@ -424,7 +415,6 @@ object CarbonDataRDDFactory {
               startCompactionThreads(sqlContext,
                 carbonLoadModel,
                 storePath,
-                kettleHomePath,
                 storeLocation,
                 compactionModel,
                 lock
@@ -454,10 +444,6 @@ object CarbonDataRDDFactory {
     try {
       LOGGER.audit(s"Data load request has been received for table" +
           s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      if (!useKettle) {
-        LOGGER.audit("Data is loading with New Data Flow for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      }
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
         carbonLoadModel.getTableName, storePath, isForceDeletion = false)
@@ -565,8 +551,7 @@ object CarbonDataRDDFactory {
            * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
            * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
            *   for locally writing carbondata files(one file one block) in nodes
-           * 4)use kettle: use DataFileLoaderRDD to load data and write to carbondata files
-           *   non kettle: use NewCarbonDataLoadRDD to load data and write to carbondata files
+           * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
            */
           val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
           // FileUtils will skip file which is no csv, and return all file path which split by ','
@@ -631,27 +616,12 @@ object CarbonDataRDDFactory {
           ).toArray
         }
 
-        if (useKettle) {
-          status = new DataFileLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            storePath,
-            kettleHomePath,
-            columnar,
-            currentLoadCount,
-            tableCreationTime,
-            schemaLastUpdatedTime,
-            blocksGroupBy,
-            isTableSplitPartition
-          ).collect()
-        } else {
-          status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            currentLoadCount,
-            blocksGroupBy,
-            isTableSplitPartition).collect()
-        }
+        status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
+          new DataLoadResultImpl(),
+          carbonLoadModel,
+          currentLoadCount,
+          blocksGroupBy,
+          isTableSplitPartition).collect()
       }
 
       def loadDataFrame(): Unit = {
@@ -665,26 +635,13 @@ object CarbonDataRDDFactory {
             sqlContext.sparkContext)
           val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
 
-          if (useKettle) {
-            status = new DataFrameLoaderRDD(sqlContext.sparkContext,
-              new DataLoadResultImpl(),
-              carbonLoadModel,
-              storePath,
-              kettleHomePath,
-              columnar,
-              currentLoadCount,
-              tableCreationTime,
-              schemaLastUpdatedTime,
-              newRdd).collect()
-          } else {
-            status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
-              new DataLoadResultImpl(),
-              carbonLoadModel,
-              currentLoadCount,
-              tableCreationTime,
-              schemaLastUpdatedTime,
-              newRdd).collect()
-          }
+          status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
+            new DataLoadResultImpl(),
+            carbonLoadModel,
+            currentLoadCount,
+            tableCreationTime,
+            schemaLastUpdatedTime,
+            newRdd).collect()
 
         } catch {
           case ex: Exception =>
@@ -693,103 +650,6 @@ object CarbonDataRDDFactory {
         }
       }
 
-      def loadDataFrameForUpdate(): Unit = {
-        def triggerDataLoadForSegment(key: String,
-            iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
-          val rddResult = new updateResultImpl()
-          val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-          val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
-            var partitionID = "0"
-            val loadMetadataDetails = new LoadMetadataDetails
-            val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-            var uniqueLoadStatusId = ""
-            try {
-              val segId = key
-              val taskNo = CarbonUpdateUtil
-                .getLatestTaskIdForSegment(segId,
-                  CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
-                    carbonTable.getCarbonTableIdentifier))
-              val index = taskNo + 1
-              uniqueLoadStatusId = carbonLoadModel.getTableName +
-                                   CarbonCommonConstants.UNDERSCORE +
-                                   (index + "_0")
-
-              // convert timestamp
-              val timeStampInLong = updateModel.get.updatedTimeStamp + ""
-              loadMetadataDetails.setPartitionCount(partitionID)
-              loadMetadataDetails.setLoadName(segId)
-              loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-              carbonLoadModel.setPartitionId(partitionID)
-              carbonLoadModel.setSegmentId(segId)
-              carbonLoadModel.setTaskNo(String.valueOf(index))
-              carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
-
-              // During Block Spill case Increment of File Count and proper adjustment of Block
-              // naming is only done when AbstractFactDataWriter.java : initializeWriter get
-              // CarbondataFileName as null. For handling Block Spill not setting the
-              // CarbondataFileName in case of Update.
-              // carbonLoadModel.setCarbondataFileName(newBlockName)
-
-              // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index)
-              loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              val rddIteratorKey = CarbonCommonConstants.RDDUTIL_UPDATE_KEY +
-                                   UUID.randomUUID().toString
-
-              try {
-                RddInpututilsForUpdate.put(rddIteratorKey,
-                  new RddIteratorForUpdate(iter, carbonLoadModel))
-                carbonLoadModel.setRddIteratorKey(rddIteratorKey)
-                CarbonDataLoadForUpdate
-                  .run(carbonLoadModel, index, storePath, kettleHomePath,
-                    segId, loadMetadataDetails, executionErrors)
-              } finally {
-                RddInpututilsForUpdate.remove(rddIteratorKey)
-              }
-            } catch {
-              case e: Exception =>
-                LOGGER.info("DataLoad failure")
-                LOGGER.error(e)
-                throw e
-            }
-
-            var finished = false
-
-            override def hasNext: Boolean = !finished
-
-            override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
-              finished = true
-              rddResult
-                .getKey(uniqueLoadStatusId,
-                  (loadMetadataDetails, executionErrors))
-            }
-          }
-          resultIter
-        }
-
-        val updateRdd = dataFrame.get.rdd
-
-
-        val keyRDD = updateRdd.map(row =>
-          // splitting as (key, value) i.e., (segment, updatedRows)
-          (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*))
-        )
-        val groupBySegmentRdd = keyRDD.groupByKey()
-
-        val nodeNumOfData = groupBySegmentRdd.partitions.flatMap[String, Array[String]] { p =>
-          DataLoadPartitionCoalescer.getPreferredLocs(groupBySegmentRdd, p).map(_.host)
-        }.distinct.size
-        val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-          sqlContext.sparkContext)
-        val groupBySegmentAndNodeRdd =
-          new UpdateCoalescedRDD[(String, scala.Iterable[Row])](groupBySegmentRdd,
-            nodes.distinct.toArray)
-
-        res = groupBySegmentAndNodeRdd.map(x =>
-          triggerDataLoadForSegment(x._1, x._2.toIterator).toList
-        ).collect()
-
-      }
-
       if (!updateModel.isDefined) {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -798,9 +658,7 @@ object CarbonDataRDDFactory {
       var errorMessage: String = "DataLoad failure"
       var executorMessage: String = ""
       try {
-        if (updateModel.isDefined) {
-          loadDataFrameForUpdate()
-        } else if (dataFrame.isDefined) {
+        if (dataFrame.isDefined) {
           loadDataFrame()
         }
         else {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 63cf9de..e6efeaa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -190,7 +190,6 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
        | LOAD DATA INPATH '$csvFolder'
        | INTO TABLE ${options.dbName}.${options.tableName}
        | OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
-       | 'USE_KETTLE' = '${options.useKettle}',
        | 'SINGLE_PASS' = '${options.singlePass}')
      """.stripMargin
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 002b6f8..4bd0564 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -110,8 +110,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
     carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
     carbonLoadModel.setStorePath(relation.tableMeta.storePath)
 
-    val kettleHomePath = CarbonScalaUtil.getKettleHome(sparkSession.sqlContext)
-
     var storeLocation = CarbonProperties.getInstance
       .getProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
         System.getProperty("java.io.tmpdir")
@@ -123,7 +121,6 @@ case class AlterTableCompaction(alterTableModel: AlterTableModel) extends Runnab
           alterTableModel,
           carbonLoadModel,
           relation.tableMeta.storePath,
-          kettleHomePath,
           storeLocation
         )
     } catch {
@@ -388,29 +385,6 @@ case class LoadTable(
 
       val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
 
-      // TODO It will be removed after kettle is removed.
-      val useKettle = options.get("use_kettle") match {
-        case Some(value) => value.toBoolean
-        case _ =>
-          var useKettleLocal = System.getProperty("use_kettle")
-          if (useKettleLocal == null && sparkSession.sparkContext.getConf.contains("use_kettle")) {
-            useKettleLocal = sparkSession.sparkContext.getConf.get("use_kettle")
-          }
-          if (useKettleLocal == null) {
-            useKettleLocal = CarbonProperties.getInstance().
-              getProperty(CarbonCommonConstants.USE_KETTLE,
-                CarbonCommonConstants.USE_KETTLE_DEFAULT)
-          }
-          try {
-            useKettleLocal.toBoolean
-          } catch {
-            case e: Exception => CarbonCommonConstants.USE_KETTLE_DEFAULT.toBoolean
-          }
-      }
-
-      val kettleHomePath =
-        if (useKettle) CarbonScalaUtil.getKettleHome(sparkSession.sqlContext) else ""
-
       val delimiter = options.getOrElse("delimiter", ",")
       val quoteChar = options.getOrElse("quotechar", "\"")
       val fileHeader = options.getOrElse("fileheader", "")
@@ -455,11 +429,11 @@ case class LoadTable(
           DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + isEmptyDataBadRecord)
       val useOnePass = options.getOrElse("single_pass", "false").trim.toLowerCase match {
         case "true" =>
-          if (!useKettle && StringUtils.isEmpty(allDictionaryPath)) {
+          if (StringUtils.isEmpty(allDictionaryPath)) {
             true
           } else {
             LOGGER.error("Can't use single_pass, because SINGLE_PASS and ALL_DICTIONARY_PATH" +
-              "can not be used together, and USE_KETTLE must be set as false")
+              "can not be used together")
             false
           }
         case "false" =>
@@ -539,10 +513,8 @@ case class LoadTable(
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
-            kettleHomePath,
             columnar,
             partitionStatus,
-            useKettle,
             result,
             dataFrame,
             updateModel)
@@ -590,10 +562,8 @@ case class LoadTable(
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
-            kettleHomePath,
             columnar,
             partitionStatus,
-            useKettle,
             result,
             loadDataFrame,
             updateModel)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
index 15d5597..e94b6ed 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/test/Spark2TestQueryExecutor.scala
@@ -38,7 +38,6 @@ object Spark2TestQueryExecutor {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   LOGGER.info("use TestQueryExecutorImplV2")
   CarbonProperties.getInstance()
-    .addProperty("carbon.kettle.home", TestQueryExecutor.kettleHome)
     .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, TestQueryExecutor.timestampFormat)
     .addProperty(CarbonCommonConstants.STORE_LOCATION_TEMP_PATH,
       System.getProperty("java.io.tmpdir"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
index bcc82ce..324d3a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableLoader.scala
@@ -78,12 +78,6 @@ object TableLoader {
     System.out.println(s"table name: $dbName.$tableName")
     val inputPaths = TableAPIUtil.escape(args(2))
 
-    val kettleHome = CarbonProperties.getInstance().getProperty("carbon.kettle.home")
-    if (kettleHome == null) {
-      CarbonProperties.getInstance().addProperty("carbon.kettle.home",
-        map.getOrElse("carbon.kettle.home", ""))
-    }
-
     val spark = TableAPIUtil.spark(storePath, s"TableLoader: $dbName.$tableName")
 
     CarbonEnv.init(spark)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 33f710b..642b330 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -54,7 +54,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4")
       """)
     LoadTable(Some("default"), "t4", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map()).run(sqlContext.sparkSession)
     val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4")
     if (table != null && table.getBucketingInfo("t4") != null) {
       assert(true)
@@ -90,7 +90,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("tableName"="t5")
       """)
     LoadTable(Some("default"), "t5", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map()).run(sqlContext.sparkSession)
 
     val plan = sql(
       """
@@ -115,7 +115,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6")
       """)
     LoadTable(Some("default"), "t6", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map()).run(sqlContext.sparkSession)
 
     val plan = sql(
       """
@@ -140,7 +140,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7")
       """)
     LoadTable(Some("default"), "t7", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map()).run(sqlContext.sparkSession)
 
     sql("DROP TABLE IF EXISTS bucketed_parquet_table")
     sql("select * from t7").write
@@ -171,7 +171,7 @@ class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll {
            OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8")
       """)
     LoadTable(Some("default"), "t8", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map()).run(sqlContext.sparkSession)
 
     sql("DROP TABLE IF EXISTS parquet_table")
     sql("select * from t8").write

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
index ceb340e..55eaa20 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/vectorreader/VectorReaderTestCase.scala
@@ -44,7 +44,7 @@ class VectorReaderTestCase extends QueryTest with BeforeAndAfterAll {
           OPTIONS("tableName"="vectorreader")
       """)
     LoadTable(Some("default"), "vectorreader", s"$resourcesPath/source.csv", Nil,
-      Map(("use_kettle", "false"))).run(sqlContext.sparkSession)
+      Map()).run(sqlContext.sparkSession)
   }
 
   test("test vector reader") {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 95a3ff4..a132d6d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,8 +105,6 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <snappy.version>1.1.2.6</snappy.version>
     <hadoop.version>2.2.0</hadoop.version>
-    <kettle.version>4.4.0-stable</kettle.version>
-    <use.kettle>false</use.kettle>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.deps.scope>compile</spark.deps.scope>
     <scala.deps.scope>compile</scala.deps.scope>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/kettle.properties
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/kettle.properties b/processing/carbonplugins/.kettle/kettle.properties
deleted file mode 100644
index 9bae2e3..0000000
--- a/processing/carbonplugins/.kettle/kettle.properties
+++ /dev/null
@@ -1,10 +0,0 @@
-# This file was generated by Pentaho Data Integration version 4.2.1.
-#
-# Here are a few examples of variables to set:
-#
-# PRODUCTION_SERVER = hercules
-# TEST_SERVER = zeus
-# DEVELOPMENT_SERVER = thor
-#
-# Note: lines like these with a # in front of it are comments
-#

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml
deleted file mode 100644
index 2ae5a87..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonaggregatesurrogategenerator/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonAggSurrogateGenerator"
-   description="Carbon Agg Surrogate Generator"
-   tooltip="Carbon Agg Surrogate Generator"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.aggregatesurrogategenerator.step.CarbonAggregateSurrogateGeneratorMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml
deleted file mode 100644
index fc3cdf5..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonautoagggraphgenerator/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonAutoAggGraphGenerator"
-   description="Carbon Auto Agg Graph Generator"
-   tooltip="Carbon Auto Agg Graph Generator"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.autoaggregategraphgenerator.step.CarbonAutoAGGGraphGeneratorMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml
deleted file mode 100644
index a0a2192..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonautoaggslicemerger/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonAutoAggSliceMerger"
-   description="Carbon Auto Agg Slice Merger"
-   tooltip="Carbon Auto Agg Slice Merger"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.merger.step.autoaggregate.CarbonAutoAggregateSliceMergerMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml
deleted file mode 100644
index 91617c5..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboncsvbasedseqgen/plugin.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
-   id="CarbonCSVBasedSurrogateGen"
-   description="Carbon CSV Based Surrogate Key Generator"
-   tooltip="Carbon CSV based Surrogate Key Generator"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedSeqGenMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml
deleted file mode 100644
index 7f33b5e..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreader/plugin.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
-   id="CSVReader"
-   description="Carbon CSVReader"
-   tooltip="Carbon CSVReader"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.csvreader.CsvReaderMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml
deleted file mode 100644
index 7173171..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboncsvreaderstrep/plugin.xml
+++ /dev/null
@@ -1,29 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
-   id="CarbonCSVReaderStep"
-   description="Carbon CSV Reader Step"
-   tooltip="Carbon CSV Reader Step"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.csvreaderstep.CsvInputMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml
deleted file mode 100644
index 92631a1..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbondatawriter/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonDataWriter"
-   description="Carbon Data Writer"
-   tooltip="Carbon Data Writer"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.store.CarbonDataWriterStepMeta">
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml
deleted file mode 100644
index 270bbe3..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonfactreader/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonFactReader"
-   description="Carbon Fact Reader"
-   tooltip="Carbon Fact Reader"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.factreader.step.CarbonFactReaderMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml
deleted file mode 100644
index 087589e..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbongroupby/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonGroupBy"
-   description="Carbon Group By"
-   tooltip="Carbon Group By"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.groupby.step.CarbonGroupByStepMeta">
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml
deleted file mode 100644
index 550531c..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carboninmemoryfactreader/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonInMemoryFactReader"
-   description="Carbon InMemory Fact Reader"
-   tooltip="Carbon InMemory Fact Reader"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.factreader.inmemory.step.CarbonInMemoryFactReaderMeta">
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml
deleted file mode 100644
index 517b757..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonseqgen/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
-   id="CarbonSurrogateGen"
-   description="Carbon Surrogate Key Generator"
-   tooltip="Carbon Surrogate Key Generator"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.surrogatekeysgenerator.dbbased.CarbonSeqGenStepMeta">
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml
deleted file mode 100644
index 618dba0..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonslicemerger/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-
-   id="CarbonSliceMerger"
-   description="Carbon Slice Merger"
-   tooltip="Carbon Slice Merger"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.merger.step.CarbonSliceMergerStepMeta">
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml
deleted file mode 100644
index 970a855..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/carbonsortkeyandgroupby/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="CarbonSortKeyGroupBy"
-   description="Carbon Sort And Group By"
-   tooltip="Carbon Sort And Group By"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.sortandgroupby.step.CarbonSortKeyAndGroupByStepMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml
deleted file mode 100644
index e0892a4..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/mdkeygenstep/plugin.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="MDKeyGen"
-   description="MD Key Gen"
-   tooltip="MD Key Gen"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.mdkeygen.MDKeyGenStepMeta">
-
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml
----------------------------------------------------------------------
diff --git a/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml b/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml
deleted file mode 100644
index d08d0ee..0000000
--- a/processing/carbonplugins/.kettle/plugins/steps/sortkeystep/plugin.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements.  See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership.  The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License.  You may obtain a copy of the License at
- ~
- ~    http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing,
- ~ software distributed under the License is distributed on an
- ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- ~ KIND, either express or implied.  See the License for the
- ~ specific language governing permissions and limitations
- ~ under the License.
- -->
-
-<plugin
-   id="SortKey"
-   description="Carbon Sort"
-   tooltip="Carbon Sort"
-   category="Carbon"
-   classname="org.apache.carbondata.processing.sortandgroupby.sortdatastep.SortKeyStepMeta">
-</plugin>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/pom.xml
----------------------------------------------------------------------
diff --git a/processing/pom.xml b/processing/pom.xml
index 096f49a..57b0908 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -49,37 +49,11 @@
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-sql_${scala.binary.version}</artifactId>
     </dependency>
-    <dependency>
-      <groupId>pentaho-kettle</groupId>
-      <artifactId>kettle-engine</artifactId>
-      <version>${kettle.version}</version>
-    </dependency>
 	<dependency>
       <groupId>com.univocity</groupId>
       <artifactId>univocity-parsers</artifactId>
       <version>1.5.6</version>
     </dependency>
-	<dependency>
-      <groupId>pentaho-kettle</groupId>
-      <artifactId>kettle-core</artifactId>
-      <version>${kettle.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>pentaho-kettle</groupId>
-      <artifactId>kettle-db</artifactId>
-      <version>${kettle.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>commons-vfs</groupId>
-      <artifactId>commons-vfs</artifactId>
-      <version>1.0</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
-    </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
new file mode 100644
index 0000000..d6d214b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/BlockDetails.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.csvload;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+
+/**
+ * blocks info
+ */
+public class BlockDetails extends FileSplit implements Serializable {
+
+  /**
+   * serialization version
+   */
+  private static final long serialVersionUID = 2293906691860002339L;
+  //block offset
+  private long blockOffset;
+  //block length
+  private long blockLength;
+  //file path which block belong to
+  private String filePath;
+  // locations where this block exists
+  private String[] locations;
+
+  public BlockDetails(Path filePath, long blockOffset, long blockLength, String[] locations) {
+    super(filePath, blockOffset, blockLength, locations);
+    this.filePath = filePath.toString();
+    this.blockOffset = blockOffset;
+    this.blockLength = blockLength;
+    this.locations = locations;
+  }
+
+  public long getBlockOffset() {
+    return blockOffset;
+  }
+
+  public long getBlockLength() {
+    return blockLength;
+  }
+
+  public String getFilePath() {
+    return FileFactory.getUpdatedFilePath(filePath);
+  }
+
+  public void setFilePath(String filePath) {
+    this.filePath = filePath;
+  }
+
+  public String[] getLocations() {
+    return locations;
+  }
+
+  /** The file containing this split's data. */
+  @Override
+  public Path getPath() { return new Path(filePath); }
+
+  /** The position of the first byte in the file to process. */
+  @Override
+  public long getStart() { return blockOffset; }
+
+  /** The number of bytes in the file to process. */
+  @Override
+  public long getLength() { return blockLength; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/e6b60907/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java b/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
deleted file mode 100644
index 7e6f6f4..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/DataGraphExecuter.java
+++ /dev/null
@@ -1,475 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.csvload;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
-import org.apache.carbondata.processing.constants.DataProcessorConstants;
-import org.apache.carbondata.processing.csvreaderstep.CsvInputMeta;
-import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
-import org.apache.carbondata.processing.etl.DataLoadingException;
-import org.apache.carbondata.processing.model.CarbonDataLoadSchema;
-import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.pentaho.di.core.KettleEnvironment;
-import org.pentaho.di.core.exception.KettleException;
-import org.pentaho.di.core.logging.LogLevel;
-import org.pentaho.di.core.logging.LoggingObjectInterface;
-import org.pentaho.di.core.logging.LoggingRegistry;
-import org.pentaho.di.core.xml.XMLHandlerCache;
-import org.pentaho.di.trans.Trans;
-import org.pentaho.di.trans.TransMeta;
-import org.pentaho.di.trans.step.StepMeta;
-import org.pentaho.di.trans.steps.getfilenames.GetFileNamesMeta;
-import org.pentaho.di.trans.steps.hadoopfileinput.HadoopFileInputMeta;
-import org.pentaho.di.trans.steps.textfileinput.TextFileInputField;
-
-public class DataGraphExecuter {
-  /**
-   * Comment for <code>LOGGER</code>
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataGraphExecuter.class.getName());
-  /**
-   * graph transformation object
-   */
-  private Trans trans;
-  /**
-   *
-   */
-  private IDataProcessStatus model;
-
-  public DataGraphExecuter(IDataProcessStatus model) {
-    this.model = model;
-  }
-
-  /**
-   * This Method checks whether csv file provided and the column name in schema are same
-   * or not
-   *
-   * @param columnNames
-   * @param csvFilePath
-   * @return true if same, false otherwise.
-   */
-  private boolean checkCSVAndRequestedTableColumns(String[] columnNames, String csvFilePath,
-      String delimiter) throws IOException {
-    return GraphExecutionUtil.checkCSVAndRequestedTableColumns(csvFilePath, columnNames, delimiter);
-  }
-
-  /**
-   * This method returns the Columns names from the schema.
-   *
-   * @param tableName
-   * @return column names array.
-   */
-  private String[] getColumnNames(String tableName, CarbonDataLoadSchema schema) {
-    Set<String> columnNames = CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName);
-    return columnNames.toArray(new String[columnNames.size()]);
-  }
-
-  private void validateCSV(String tableName, CarbonFile f, CarbonDataLoadSchema schema,
-      String delimiter) throws DataLoadingException, IOException {
-
-    String[] columnNames = getColumnNames(tableName, schema);
-
-    if (!checkCSVAndRequestedTableColumns(columnNames, f.getAbsolutePath(), delimiter)) {
-      LOGGER.error(
-          "CSV File provided is not proper. Column names in schema and csv header are not same. "
-              + "CSVFile Name : "
-              + f.getName());
-      throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
-          "CSV File provided is not proper. Column names in schema and csv header are not same. "
-              + "CSVFile Name : "
-              + f.getName());
-    }
-  }
-
-  public void executeGraph(String graphFilePath, SchemaInfo schemaInfo, CarbonDataLoadSchema schema)
-      throws DataLoadingException {
-
-    //This Method will validate the both fact and dimension csv files.
-    if (!schemaInfo.isAutoAggregateRequest() && model.getRddIteratorKey() == null) {
-      validateCSVFiles(schema);
-    }
-    execute(graphFilePath, schemaInfo);
-  }
-
-  /**
-   * executeGraph which generate the kettle graph
-   *
-   * @throws DataLoadingException
-   */
-
-  private void execute(String graphFilePath, SchemaInfo schemaInfo)
-      throws DataLoadingException {
-
-    //This Method will validate the both fact and dimension csv files.
-
-    initKettleEnv();
-    TransMeta transMeta = null;
-    try {
-      transMeta = new TransMeta(graphFilePath);
-      transMeta.setFilename(graphFilePath);
-      trans = new Trans(transMeta);
-      if (!schemaInfo.isAutoAggregateRequest()) {
-        // Register HDFS as a file system type with VFS to make HadoopFileInputMeta work
-        boolean hdfsReadMode =
-            model.getCsvFilePath() != null && model.getCsvFilePath().startsWith("hdfs:");
-        trans.setVariable("modifiedDimNames", model.getDimTables());
-        trans.setVariable("csvInputFilePath", model.getCsvFilePath());
-        trans.setVariable(CarbonCommonConstants.BAD_RECORD_KEY, null);
-        if (hdfsReadMode) {
-          trans.addParameterDefinition("vfs.hdfs.dfs.client.read.shortcircuit", "true", "");
-          trans.addParameterDefinition("vfs.hdfs.dfs.domain.socket.path",
-              "/var/lib/hadoop-hdfs-new/dn_socket", "");
-          trans.addParameterDefinition("vfs.hdfs.dfs.block.local-path-access.user", "hadoop,root",
-              "");
-          trans.addParameterDefinition("vfs.hdfs.io.file.buffer.size", "5048576", "");
-        }
-        List<StepMeta> stepsMeta = trans.getTransMeta().getSteps();
-        StringBuilder builder = new StringBuilder();
-        StringBuilder measuresInCSVFile = new StringBuilder();
-        processCsvInputMeta(stepsMeta, builder, measuresInCSVFile);
-        processGetFileNamesMeta(stepsMeta);
-
-        processHadoopFileInputMeta(stepsMeta, builder, measuresInCSVFile);
-      }
-      setGraphLogLevel();
-      trans.execute(null);
-      LOGGER.info("Graph execution is started " + graphFilePath);
-      trans.waitUntilFinished();
-      LOGGER.info("Graph execution is finished.");
-    } catch (KettleException | IOException e) {
-      LOGGER.error(e, "Unable to start execution of graph " + e.getMessage());
-      throw new DataLoadingException("Unable to start execution of graph ", e);
-    }
-
-    //Don't change the logic of creating key
-    String key = model.getDatabaseName() + '/' + model.getTableName() + '_' + model.getTableName();
-
-    if (trans.getErrors() > 0) {
-      if (null != trans.getVariable(CarbonCommonConstants.BAD_RECORD_KEY)) {
-        LOGGER.error(trans.getVariable(CarbonCommonConstants.BAD_RECORD_KEY));
-        throw new DataLoadingException(
-            "Data load failed due to bad record ," + trans
-                .getVariable(CarbonCommonConstants.BAD_RECORD_KEY));
-      }
-      LOGGER.error("Graph Execution had errors");
-      throw new DataLoadingException("Due to internal errors, please check logs for more details.");
-    } else if (null != BadRecordsLogger.hasBadRecord(key)) {
-      LOGGER.error("Data load is partially success");
-      throw new DataLoadingException(DataProcessorConstants.BAD_REC_FOUND,
-          "Data load is partially success");
-    } else {
-      LOGGER.info("Graph execution task is over with No error.");
-    }
-    LoggingRegistry instance = LoggingRegistry.getInstance();
-    Map<String, LoggingObjectInterface> map = instance.getMap();
-    if (null != map) {
-      for (Entry<String, LoggingObjectInterface> entry : map.entrySet()) {
-        instance.removeIncludingChildren(entry.getKey());
-      }
-    }
-
-    map = null;
-    XMLHandlerCache.getInstance().clear();
-    trans.cleanup();
-    trans.eraseParameters();
-    trans.killAll();
-    trans = null;
-  }
-
-  /**
-   * @param stepsMeta
-   * @param builder
-   * @param measuresInCSVFile
-   * @throws DataLoadingException
-   */
-  private void processHadoopFileInputMeta(List<StepMeta> stepsMeta, StringBuilder builder,
-      StringBuilder measuresInCSVFile) throws DataLoadingException {
-    for (StepMeta step : stepsMeta) {
-      if (step.getStepMetaInterface() instanceof HadoopFileInputMeta) {
-
-        HadoopFileInputMeta stepMetaInterface = (HadoopFileInputMeta) step.getStepMetaInterface();
-        if (null != model.getCsvFilePath()) {
-          stepMetaInterface.setFilenameField("filename");
-          stepMetaInterface.setFileName(new String[] { "${csvInputFilePath}" });
-          stepMetaInterface.setDefault();
-          stepMetaInterface.setEncoding(CarbonCommonConstants.DEFAULT_CHARSET);
-          stepMetaInterface.setEnclosure("\"");
-          stepMetaInterface.setHeader(true);
-          stepMetaInterface.setSeparator(",");
-          stepMetaInterface.setAcceptingFilenames(true);
-          stepMetaInterface.setAcceptingStepName("getFileNames");
-          stepMetaInterface.setFileFormat("mixed");
-          stepMetaInterface.setAcceptingField("filename");
-
-          CarbonFile csvFileToRead = GraphExecutionUtil.getCsvFileToRead(model.getCsvFilePath());
-          TextFileInputField[] inputFields = GraphExecutionUtil
-              .getTextInputFiles(csvFileToRead, builder, measuresInCSVFile, ",");
-          stepMetaInterface.setInputFields(inputFields);
-        } else if (model.isDirectLoad()) {
-          String[] files = new String[model.getFilesToProcess().size()];
-          int i = 0;
-          for (String file : model.getFilesToProcess()) {
-            files[i++] = file;
-          }
-          stepMetaInterface.setFileName(files);
-          stepMetaInterface.setFilenameField("filename");
-          stepMetaInterface.setDefault();
-          stepMetaInterface.setEncoding(CarbonCommonConstants.DEFAULT_CHARSET);
-          stepMetaInterface.setEnclosure("\"");
-          stepMetaInterface.setHeader(true);
-          stepMetaInterface.setSeparator(",");
-          stepMetaInterface.setAcceptingFilenames(true);
-          stepMetaInterface.setAcceptingStepName("getFileNames");
-          stepMetaInterface.setFileFormat("mixed");
-          stepMetaInterface.setAcceptingField("filename");
-
-          if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
-            TextFileInputField[] inputParams = GraphExecutionUtil
-                .getTextInputFiles(model.getCsvHeader(), builder, measuresInCSVFile, ",");
-            ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputParams);
-            ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
-            ((CsvInputMeta) step.getStepMetaInterface())
-              .setEscapeCharacter(model.getEscapeCharacter());
-            ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(false);
-
-          } else if (model.getFilesToProcess().size() > 0) {
-            CarbonFile csvFile =
-                GraphExecutionUtil.getCsvFileToRead(model.getFilesToProcess().get(0));
-            TextFileInputField[] inputFields = GraphExecutionUtil
-                .getTextInputFiles(csvFile, builder, measuresInCSVFile,
-                    model.getCsvDelimiter());
-            ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
-            ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
-            ((CsvInputMeta) step.getStepMetaInterface())
-              .setEscapeCharacter(model.getEscapeCharacter());
-            ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(true);
-          }
-        }
-
-        break;
-      }
-    }
-  }
-
-  /**
-   * @param stepsMeta
-   * @throws IOException
-   */
-  private void processGetFileNamesMeta(List<StepMeta> stepsMeta) throws IOException {
-    for (StepMeta step : stepsMeta) {
-      if (step.getStepMetaInterface() instanceof GetFileNamesMeta) {
-        GetFileNamesMeta stepMetaInterface = (GetFileNamesMeta) step.getStepMetaInterface();
-        if (null != model.getCsvFilePath()) {
-          boolean checkIsFolder = GraphExecutionUtil.checkIsFolder(model.getCsvFilePath());
-          if (checkIsFolder) {
-            stepMetaInterface.setFileName(new String[] { model.getCsvFilePath() });
-            stepMetaInterface.setFileMask(new String[] { ".*\\.csv$|.*\\.inprogress" });
-            stepMetaInterface.setExcludeFileMask(new String[] { "1" });
-          } else {
-            //If absolute file path is provided for the data load and stopped in between then csv
-            // file will be
-            // changed to inprogress, and when next time server start then we need to check the
-            // file name extension.
-            // can contain .csv.inprogress file.
-
-            FileType fileType = FileFactory.getFileType(model.getCsvFilePath());
-
-            boolean exists = FileFactory.isFileExist(model.getCsvFilePath(), fileType);
-
-            if (exists) {
-              stepMetaInterface.setFileName(new String[] { model.getCsvFilePath() });
-              stepMetaInterface.setExcludeFileMask(new String[] { null });
-            } else {
-              stepMetaInterface.setFileName(new String[] {
-                  model.getCsvFilePath() + CarbonCommonConstants.FILE_INPROGRESS_STATUS });
-              stepMetaInterface.setExcludeFileMask(new String[] { null });
-            }
-          }
-        } else if (model.isDirectLoad()) {
-          String[] files = new String[model.getFilesToProcess().size()];
-          int i = 0;
-          for (String file : model.getFilesToProcess()) {
-            files[i++] = file;
-          }
-          stepMetaInterface.setFileName(files);
-        }
-        break;
-      }
-    }
-  }
-
-  /**
-   * @param stepsMeta
-   * @param builder
-   * @param measuresInCSVFile
-   * @throws DataLoadingException
-   */
-  private void processCsvInputMeta(List<StepMeta> stepsMeta, StringBuilder builder,
-      StringBuilder measuresInCSVFile) throws DataLoadingException {
-    for (StepMeta step : stepsMeta) {
-      if (step.getStepMetaInterface() instanceof CsvInputMeta) {
-        if (null != model.getCsvFilePath() && model.getRddIteratorKey() == null) {
-          CarbonFile csvFileToRead = GraphExecutionUtil.getCsvFileToRead(model.getCsvFilePath());
-          TextFileInputField[] inputFields = GraphExecutionUtil
-              .getTextInputFiles(csvFileToRead, builder, measuresInCSVFile, ",");
-          ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
-        } else if (model.isDirectLoad()) {
-          if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
-            TextFileInputField[] inputFields = GraphExecutionUtil
-                .getTextInputFiles(model.getCsvHeader(), builder, measuresInCSVFile, ",");
-            ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
-            ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
-            ((CsvInputMeta) step.getStepMetaInterface())
-                .setEscapeCharacter(model.getEscapeCharacter());
-            ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(false);
-
-          } else if (model.getFilesToProcess().size() > 0) {
-            CarbonFile csvFileToRead =
-                GraphExecutionUtil.getCsvFileToRead(model.getFilesToProcess().get(0));
-            TextFileInputField[] inputFields = GraphExecutionUtil
-                .getTextInputFiles(csvFileToRead, builder, measuresInCSVFile,
-                    model.getCsvDelimiter());
-            ((CsvInputMeta) step.getStepMetaInterface()).setInputFields(inputFields);
-            ((CsvInputMeta) step.getStepMetaInterface()).setDelimiter(model.getCsvDelimiter());
-            ((CsvInputMeta) step.getStepMetaInterface())
-              .setEscapeCharacter(model.getEscapeCharacter());
-            ((CsvInputMeta) step.getStepMetaInterface()).setHeaderPresent(true);
-          }
-        }
-        break;
-      }
-    }
-  }
-
-  /**
-   *
-   */
-  private void initKettleEnv() {
-    try {
-      KettleEnvironment.init(false);
-      LOGGER.info("Kettle environment initialized");
-    } catch (KettleException ke) {
-      LOGGER.error("Unable to initialize Kettle Environment " + ke.getMessage());
-    }
-  }
-
-
-  private void setGraphLogLevel() {
-    trans.setLogLevel(LogLevel.NOTHING);
-  }
-
-  /**
-   * This method will validate the both fact as well as dimension csv files.
-   *
-   * @param schema
-   * @throws DataLoadingException
-   */
-  private void validateCSVFiles(CarbonDataLoadSchema schema) throws DataLoadingException {
-    // Validate the Fact CSV Files.
-    String csvFilePath = model.getCsvFilePath();
-    if (csvFilePath != null) {
-      FileType fileType = FileFactory.getFileType(csvFilePath);
-      try {
-        boolean exists = FileFactory.isFileExist(csvFilePath, fileType);
-        if (exists && FileFactory.getCarbonFile(csvFilePath, fileType).isDirectory()) {
-          CarbonFile fileDir = FileFactory.getCarbonFile(csvFilePath, fileType);
-          CarbonFile[] listFiles = fileDir.listFiles(new CarbonFileFilter() {
-
-            @Override public boolean accept(CarbonFile pathname) {
-              if (pathname.getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || pathname
-                  .getName().endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
-                      + CarbonCommonConstants.FILE_INPROGRESS_STATUS)) {
-                return true;
-              }
-              return false;
-            }
-          });
-
-          for (CarbonFile f : listFiles) {
-            validateCSV(model.getTableName(), f, schema, ",");
-          }
-        } else {
-
-          if (!(csvFilePath.endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION) || csvFilePath
-              .endsWith(CarbonCommonConstants.CSV_FILE_EXTENSION
-                  + CarbonCommonConstants.FILE_INPROGRESS_STATUS))) {
-            LOGGER.error("File provided is not proper, Only csv files are allowed." + csvFilePath);
-            throw new DataLoadingException(
-                "File provided is not proper, Only csv files are allowed." + csvFilePath);
-          }
-
-          if (exists) {
-            validateCSV(model.getTableName(),
-                FileFactory.getCarbonFile(csvFilePath, fileType), schema, ",");
-          } else {
-            validateCSV(model.getTableName(), FileFactory
-                .getCarbonFile(csvFilePath + CarbonCommonConstants.FILE_INPROGRESS_STATUS,
-                    fileType), schema, ",");
-          }
-
-        }
-
-      } catch (IOException e) {
-        LOGGER.error(e,
-            "Error while checking file exists" + csvFilePath);
-      }
-    } else if (model.isDirectLoad()) {
-      if (null != model.getCsvHeader() && !model.getCsvHeader().isEmpty()) {
-        if (!CarbonDataProcessorUtil
-            .isHeaderValid(model.getTableName(), model.getCsvHeader(), schema, ",")) {
-          LOGGER.error("CSV header provided in DDL is not proper."
-              + " Column names in schema and CSV header are not the same.");
-          throw new DataLoadingException(DataProcessorConstants.CSV_VALIDATION_ERRROR_CODE,
-              "CSV header provided in DDL is not proper. Column names in schema and CSV header are "
-                  + "not the same.");
-        }
-      } else {
-        for (String file : model.getFilesToProcess()) {
-          try {
-            FileFactory.FileType fileType = FileFactory.getFileType(file);
-            if (FileFactory.isFileExist(file, fileType)) {
-              validateCSV(model.getTableName(),
-                  FileFactory.getCarbonFile(file, fileType), schema,
-                  model.getCsvDelimiter());
-            }
-          } catch (IOException e) {
-            LOGGER.error(e,
-                "Error while checking file exists" + file);
-          }
-        }
-      }
-    }
-  }
-
-}