You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:43 UTC

[05/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
deleted file mode 100644
index cd629bf..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv
-
-import java.io.IOException
-
-import scala.collection.JavaConverters._
-import scala.util.control.NonFatal
-
-import com.databricks.spark.csv.newapi.CarbonTextFile
-import com.databricks.spark.csv.util._
-import com.databricks.spark.sql.readers._
-import org.apache.commons.csv._
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.fs.Path
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
-import org.apache.spark.sql.types._
-import org.slf4j.LoggerFactory
-
-import org.apache.carbondata.processing.etl.DataLoadingException
-
-case class CarbonCsvRelation protected[spark] (
-    location: String,
-    useHeader: Boolean,
-    delimiter: Char,
-    quote: Char,
-    escape: Character,
-    comment: Character,
-    parseMode: String,
-    parserLib: String,
-    ignoreLeadingWhiteSpace: Boolean,
-    ignoreTrailingWhiteSpace: Boolean,
-    userSchema: StructType = null,
-    charset: String = TextFile.DEFAULT_CHARSET.name(),
-    inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext)
-  extends BaseRelation with TableScan with InsertableRelation {
-
-  /**
-   * Limit the number of lines we'll search for a header row that isn't comment-prefixed.
-   */
-  private val MAX_COMMENT_LINES_IN_HEADER = 10
-
-  private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass)
-
-  // Parse mode flags
-  if (!ParseModes.isValidMode(parseMode)) {
-    logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
-  }
-
-  if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) {
-    logger.warn(s"Ignore white space options may not work with Commons parserLib option")
-  }
-
-  private val failFast = ParseModes.isFailFastMode(parseMode)
-  private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
-  private val permissive = ParseModes.isPermissiveMode(parseMode)
-
-  val schema = inferSchema()
-
-  def tokenRdd(header: Array[String]): RDD[Array[String]] = {
-
-    val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
-
-    if(ParserLibs.isUnivocityLib(parserLib)) {
-      univocityParseCSV(baseRDD, header)
-    } else {
-      val csvFormat = CSVFormat.DEFAULT
-        .withDelimiter(delimiter)
-        .withQuote(quote)
-        .withEscape(escape)
-        .withSkipHeaderRecord(false)
-        .withHeader(header: _*)
-        .withCommentMarker(comment)
-
-      // If header is set, make sure firstLine is materialized before sending to executors.
-      val filterLine = if (useHeader) firstLine else null
-
-      baseRDD.mapPartitions { iter =>
-        // When using header, any input line that equals firstLine is assumed to be header
-        val csvIter = if (useHeader) {
-          iter.filter(_ != filterLine)
-        } else {
-          iter
-        }
-        parseCSV(csvIter, csvFormat)
-      }
-    }
-  }
-
-  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
-  def buildScan: RDD[Row] = {
-    val schemaFields = schema.fields
-    tokenRdd(schemaFields.map(_.name)).flatMap{ tokens =>
-
-      if (dropMalformed && schemaFields.length != tokens.size) {
-        logger.warn(s"Dropping malformed line: $tokens")
-        None
-      } else if (failFast && schemaFields.length != tokens.size) {
-        throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens")
-      } else {
-        var index: Int = 0
-        val rowArray = new Array[Any](schemaFields.length)
-        try {
-          index = 0
-          while (index < schemaFields.length) {
-            val field = schemaFields(index)
-            rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable)
-            index = index + 1
-          }
-          Some(Row.fromSeq(rowArray))
-        } catch {
-          case aiob: ArrayIndexOutOfBoundsException if permissive =>
-            (index until schemaFields.length).foreach(ind => rowArray(ind) = null)
-            Some(Row.fromSeq(rowArray))
-        }
-      }
-    }
-  }
-
-  private def inferSchema(): StructType = {
-    if (this.userSchema != null) {
-      userSchema
-    } else {
-      val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) {
-        val escapeVal = if (escape == null) '\\' else escape.charValue()
-        val commentChar: Char = if (comment == null) '\0' else comment
-        new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal,
-          commentMarker = commentChar).parseLine(firstLine)
-      } else {
-        val csvFormat = CSVFormat.DEFAULT
-          .withDelimiter(delimiter)
-          .withQuote(quote)
-          .withEscape(escape)
-          .withSkipHeaderRecord(false)
-        CSVParser.parse(firstLine, csvFormat).getRecords.get(0).asScala.toArray
-      }
-      if(null == firstRow) {
-        throw new DataLoadingException("First line of the csv is not valid.")
-      }
-      val header = if (useHeader) {
-        firstRow
-      } else {
-        firstRow.zipWithIndex.map { case (value, index) => s"C$index"}
-      }
-      if (this.inferCsvSchema) {
-        InferSchema(tokenRdd(header), header)
-      } else {
-        // By default fields are assumed to be StringType
-        val schemaFields = header.map { fieldName =>
-          StructField(fieldName.toString, StringType, nullable = true)
-        }
-        StructType(schemaFields)
-      }
-    }
-  }
-
-  /**
-   * Returns the first line of the first non-empty file in path
-   */
-  private lazy val firstLine = {
-    val csv = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
-    if (comment == null) {
-      csv.first()
-    } else {
-      csv.take(MAX_COMMENT_LINES_IN_HEADER)
-        .find(x => !StringUtils.isEmpty(x) && !x.startsWith(comment.toString))
-        .getOrElse(sys.error(s"No uncommented header line in " +
-          s"first $MAX_COMMENT_LINES_IN_HEADER lines"))
-    }
-   }
-
-  private def univocityParseCSV(
-     file: RDD[String],
-     header: Seq[String]): RDD[Array[String]] = {
-    // If header is set, make sure firstLine is materialized before sending to executors.
-    val filterLine = if (useHeader) firstLine else null
-    val dataLines = if (useHeader) file.filter(_ != filterLine) else file
-    val rows = dataLines.mapPartitionsWithIndex({
-      case (split, iter) =>
-        val escapeVal = if (escape == null) '\\' else escape.charValue()
-        val commentChar: Char = if (comment == null) '\0' else comment
-
-        new CarbonBulkCsvReader(iter, split,
-          headers = header, fieldSep = delimiter,
-          quote = quote, escape = escapeVal, commentMarker = commentChar,
-          ignoreLeadingSpace = ignoreLeadingWhiteSpace,
-          ignoreTrailingSpace = ignoreTrailingWhiteSpace)
-    }, true)
-    rows
-  }
-
-  private def parseCSV(
-      iter: Iterator[String],
-      csvFormat: CSVFormat): Iterator[Array[String]] = {
-    iter.flatMap { line =>
-      try {
-        val records = CSVParser.parse(line, csvFormat).getRecords
-        if (records.isEmpty) {
-          logger.warn(s"Ignoring empty line: $line")
-          None
-        } else {
-          Some(records.get(0).asScala.toArray)
-        }
-      } catch {
-        case NonFatal(e) if !failFast =>
-          logger.error(s"Exception while parsing line: $line. ", e)
-          None
-      }
-    }
-  }
-
-  // The function below was borrowed from JSONRelation
-  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    val filesystemPath = new Path(location)
-    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-
-    if (overwrite) {
-      try {
-        fs.delete(filesystemPath, true)
-      } catch {
-        case e: IOException =>
-          throw new IOException(
-            s"Unable to clear output directory ${filesystemPath.toString} prior"
-              + s" to INSERT OVERWRITE a CSV table:\n${e.toString}")
-      }
-      // Write the data. We assume that schema isn't changed, and we won't update it.
-      data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString))
-    } else {
-      sys.error("CSV tables only support INSERT OVERWRITE for now.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
deleted file mode 100644
index 3589823..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv.newapi
-
-import java.nio.charset.Charset
-
-import com.databricks.spark.csv.util.TextFile
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{LongWritable, Text}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
-import org.apache.spark.SparkContext
-import org.apache.spark.rdd.{NewHadoopRDD, RDD}
-
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
-
-/**
- * create RDD use CarbonDataLoadInputFormat
- */
-private[csv] object CarbonTextFile {
-
-  private def newHadoopRDD(sc: SparkContext, location: String) = {
-    val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
-    hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
-    hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
-    hadoopConfiguration.set("io.compression.codecs",
-      """org.apache.hadoop.io.compress.GzipCodec,
-         org.apache.hadoop.io.compress.DefaultCodec,
-         org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
-    CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration)
-    new NewHadoopRDD[LongWritable, Text](
-      sc,
-      classOf[TextInputFormat],
-      classOf[LongWritable],
-      classOf[Text],
-      hadoopConfiguration).setName("newHadoopRDD-spark-csv")
-  }
-
-  def withCharset(sc: SparkContext, location: String, charset: String): RDD[String] = {
-    if (Charset.forName(charset) == TextFile.DEFAULT_CHARSET) {
-      newHadoopRDD(sc, location).map(pair => pair._2.toString)
-    } else {
-      // can't pass a Charset object here cause its not serializable
-      // TODO: maybe use mapPartitions instead?
-      newHadoopRDD(sc, location).map(
-        pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
deleted file mode 100644
index cd76651..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.databricks.spark.csv.newapi
-
-import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
-import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
-import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.StructType
-
-/**
- * Provides access to CSV data from pure SQL statements (i.e. for users of the
- * JDBC server).
- */
-class DefaultSource
-    extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
-
-  private def checkPath(parameters: Map[String, String]): String = {
-    parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))
-  }
-
-  /**
-   * Creates a new relation for data store in CSV given parameters.
-   * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
-   */
-  override def createRelation(sqlContext: SQLContext,
-      parameters: Map[String, String]): BaseRelation = {
-    createRelation(sqlContext, parameters, null)
-  }
-
-  /**
-   * Creates a new relation for data store in CSV given parameters and user supported schema.
-   * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
-   */
-  override def createRelation(
-    sqlContext: SQLContext,
-    parameters: Map[String, String],
-    schema: StructType): BaseRelation = {
-    val path = checkPath(parameters)
-    val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ","))
-
-    val quote = parameters.getOrElse("quote", "\"")
-    val quoteChar = if (quote.length == 1) {
-      quote.charAt(0)
-    } else {
-      throw new Exception("Quotation cannot be more than one character.")
-    }
-
-    val escape = parameters.getOrElse("escape", null)
-    val escapeChar: Character = if (escape == null || (escape.length == 0)) {
-      null
-    } else if (escape.length == 1) {
-      escape.charAt(0)
-    } else {
-      throw new Exception("Escape character cannot be more than one character.")
-    }
-
-    val comment = parameters.getOrElse("comment", "#")
-    val commentChar: Character = if (comment == null) {
-      null
-    } else if (comment.length == 1) {
-      comment.charAt(0)
-    } else {
-      throw new Exception("Comment marker cannot be more than one character.")
-    }
-
-    val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
-
-    val useHeader = parameters.getOrElse("header", "false")
-    val headerFlag = if (useHeader == "true") {
-      true
-    } else if (useHeader == "false") {
-      false
-    } else {
-      throw new Exception("Header flag can be true or false")
-    }
-
-    val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT)
-    val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false")
-    val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") {
-      false
-    } else if (ignoreLeadingWhiteSpace == "true") {
-      if (!ParserLibs.isUnivocityLib(parserLib)) {
-        throw new Exception("Ignore whitesspace supported for Univocity parser only")
-      }
-      true
-    } else {
-      throw new Exception("Ignore white space flag can be true or false")
-    }
-    val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false")
-    val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") {
-      false
-    } else if (ignoreTrailingWhiteSpace == "true") {
-      if (!ParserLibs.isUnivocityLib(parserLib)) {
-        throw new Exception("Ignore whitespace supported for the Univocity parser only")
-      }
-      true
-    } else {
-      throw new Exception("Ignore white space flag can be true or false")
-    }
-
-    val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name())
-    // TODO validate charset?
-
-    val inferSchema = parameters.getOrElse("inferSchema", "false")
-    val inferSchemaFlag = if (inferSchema == "false") {
-      false
-    } else if (inferSchema == "true") {
-      true
-    } else {
-      throw new Exception("Infer schema flag can be true or false")
-    }
-
-    CarbonCsvRelation(path,
-      headerFlag,
-      delimiter,
-      quoteChar,
-      escapeChar,
-      commentChar,
-      parseMode,
-      parserLib,
-      ignoreLeadingWhiteSpaceFlag,
-      ignoreTrailingWhiteSpaceFlag,
-      schema,
-      charset,
-      inferSchemaFlag)(sqlContext)
-  }
-
-  override def createRelation(
-    sqlContext: SQLContext,
-    mode: SaveMode,
-    parameters: Map[String, String],
-    data: DataFrame): BaseRelation = {
-    val path = checkPath(parameters)
-    val filesystemPath = new Path(path)
-    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-    val doSave = if (fs.exists(filesystemPath)) {
-      mode match {
-        case SaveMode.Append =>
-          sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
-        case SaveMode.Overwrite =>
-          fs.delete(filesystemPath, true)
-          true
-        case SaveMode.ErrorIfExists =>
-          sys.error(s"path $path already exists.")
-        case SaveMode.Ignore => false
-      }
-    } else {
-      true
-    }
-
-    val codec: Class[_ <: CompressionCodec] =
-      parameters.getOrElse("codec", "none") match {
-        case "gzip" => classOf[GzipCodec]
-        case _ => null
-      }
-
-    if (doSave) {
-      // Only save data when the save mode is not ignore.
-      data.saveAsCsvFile(path, parameters, codec)
-    }
-
-    createRelation(sqlContext, parameters, data.schema)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
deleted file mode 100644
index 5a02bfd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-
-class CarbonCleanFilesRDD[V: ClassTag](
-    sc: SparkContext,
-    valueClass: Value[V],
-    databaseName: String,
-    tableName: String,
-    partitioner: Partitioner)
-  extends RDD[V](sc, Nil) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-    val iter = new Iterator[(V)] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      // TODO call CARBON delete API
-
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        valueClass.getValue(null)
-      }
-
-    }
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name: " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
deleted file mode 100644
index e306a89..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ /dev/null
@@ -1,604 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.carbondata.spark.rdd
-
-import java.lang.Long
-import java.text.SimpleDateFormat
-import java.util
-import java.util.UUID
-
-import scala.collection.JavaConverters._
-import scala.util.Random
-
-import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.DataLoadCoalescedRDD
-import org.apache.spark.rdd.DataLoadPartitionWrap
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.Row
-import org.apache.spark.util.TaskContextUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.common.logging.impl.StandardLogService
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
-import org.apache.carbondata.processing.constants.DataProcessorConstants
-import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
-import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.graphgenerator.GraphGenerator
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.DataLoadResult
-import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-
-/**
- * This partition class use to split by TableSplit
- *
- */
-class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit,
-    val blocksDetails: Array[BlockDetails])
-  extends Partition {
-
-  override val index: Int = idx
-  val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
-  val partitionBlocksDetail = blocksDetails
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-/**
- * This partition class use to split by Host
- *
- */
-class CarbonNodePartition(rddId: Int, val idx: Int, host: String,
-    val blocksDetails: Array[BlockDetails])
-  extends Partition {
-
-  override val index: Int = idx
-  val serializableHadoopSplit = host
-  val nodeBlocksDetail = blocksDetails
-
-  override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
-
-class SparkPartitionLoader(model: CarbonLoadModel,
-    splitIndex: Int,
-    storePath: String,
-    kettleHomePath: String,
-    loadCount: Int,
-    loadMetadataDetails: LoadMetadataDetails) {
-  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  var storeLocation: String = ""
-
-  def initialize(): Unit = {
-    val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
-    if (null == carbonPropertiesFilePath) {
-      System.setProperty("carbon.properties.filepath",
-        System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
-    }
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
-    CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
-    CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
-    CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
-    CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true")
-    CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true")
-    CarbonProperties.getInstance().addProperty("high.cardinality.value", "100000")
-    CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false")
-    CarbonProperties.getInstance().addProperty("carbon.leaf.node.size", "120000")
-
-    // this property is used to determine whether temp location for carbon is inside
-    // container temp dir or is yarn application directory.
-    val carbonUseLocalDir = CarbonProperties.getInstance()
-      .getProperty("carbon.use.local.dir", "false")
-    if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-      val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
-      if (null != storeLocations && storeLocations.nonEmpty) {
-        storeLocation = storeLocations(Random.nextInt(storeLocations.length))
-      }
-      if (storeLocation == null) {
-        storeLocation = System.getProperty("java.io.tmpdir")
-      }
-    } else {
-      storeLocation = System.getProperty("java.io.tmpdir")
-    }
-    storeLocation = storeLocation + '/' + System.nanoTime() + '/' + splitIndex
-  }
-
-  def run(): Unit = {
-    try {
-      CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
-        kettleHomePath)
-      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-    } catch {
-      case e: DataLoadingException => if (e.getErrorCode ==
-                                          DataProcessorConstants.BAD_REC_FOUND) {
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-        LOGGER.info("Bad Record Found")
-      } else {
-        throw e
-      }
-      case e: Exception =>
-        throw e
-    } finally {
-      // delete temp location data
-      try {
-        val isCompaction = false
-        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, isCompaction)
-      } catch {
-        case e: Exception =>
-          LOGGER.error(e, "Failed to delete local data")
-      }
-      if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
-        loadMetadataDetails.getLoadStatus)) {
-        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-          .equals(loadMetadataDetails.getLoadStatus)) {
-          LOGGER.info("DataLoad complete")
-          LOGGER.info("Data Load partially successful with LoadCount:" + loadCount)
-        } else {
-          LOGGER.info("DataLoad complete")
-          LOGGER.info("Data Loaded successfully with LoadCount:" + loadCount)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
-            model.getPartitionId)
-        }
-      }
-    }
-  }
-}
-
-/**
- * Use this RDD class to load csv data file
- *
- * @param sc                    The SparkContext to associate the RDD with.
- * @param result                Output result
- * @param carbonLoadModel       Carbon load model which contain the load info
- * @param storePath             The store location
- * @param kettleHomePath        The kettle home path
- * @param partitioner           Partitioner which specify how to partition
- * @param columinar             whether it is columinar
- * @param loadCount             Current load count
- * @param tableCreationTime     Time of creating table
- * @param schemaLastUpdatedTime Time of last schema update
- * @param blocksGroupBy         Blocks Array which is group by partition or host
- * @param isTableSplitPartition Whether using table split partition
- * @tparam K Class of the key associated with the Result.
- * @tparam V Class of the value associated with the Result.
- */
-class DataFileLoaderRDD[K, V](
-    sc: SparkContext,
-    result: DataLoadResult[K, V],
-    carbonLoadModel: CarbonLoadModel,
-    storePath: String,
-    kettleHomePath: String,
-    partitioner: Partitioner,
-    columinar: Boolean,
-    loadCount: Integer,
-    tableCreationTime: Long,
-    schemaLastUpdatedTime: Long,
-    blocksGroupBy: Array[(String, Array[BlockDetails])],
-    isTableSplitPartition: Boolean) extends RDD[(K, V)](sc, Nil) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    if (isTableSplitPartition) {
-      // for table split partition
-      var splits = Array[TableSplit]()
-      if (carbonLoadModel.isDirectLoad) {
-        splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath)
-      } else {
-        splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName, null)
-      }
-
-      splits.zipWithIndex.map { case (split, index) =>
-        // filter the same partition unique id, because only one will match, so get 0 element
-        val blocksDetails: Array[BlockDetails] = blocksGroupBy.filter { case (uniqueId, _) =>
-          uniqueId == split.getPartition.getUniqueID
-        }(0)._2
-        new CarbonTableSplitPartition(id, index, split, blocksDetails)
-      }
-    } else {
-      // for node partition
-      blocksGroupBy.zipWithIndex.map { case ((uniqueId, blockDetails), index) =>
-        new CarbonNodePartition(id, index, uniqueId, blockDetails)
-      }
-    }
-  }
-
-  override def checkpoint() {
-    // Do nothing. Hadoop RDD should not be checkpointed.
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val iter = new Iterator[(K, V)] {
-      var partitionID = "0"
-      val loadMetadataDetails = new LoadMetadataDetails()
-      var model: CarbonLoadModel = _
-      var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
-                               theSplit.index
-      try {
-        loadMetadataDetails.setPartitionCount(partitionID)
-        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
-        setModelAndBlocksInfo()
-        val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
-          kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize
-        if (model.isRetentionRequest) {
-          recreateAggregationTableForRetention
-        } else if (model.isAggLoadRequest) {
-          loadMetadataDetails.setLoadStatus(createManualAggregateTable)
-        } else {
-          loader.run()
-        }
-      } catch {
-        case e: Exception =>
-          logInfo("DataLoad failure")
-          LOGGER.error(e)
-          throw e
-      }
-
-      def setModelAndBlocksInfo(): Unit = {
-        if (isTableSplitPartition) {
-          // for table split partition
-          val split = theSplit.asInstanceOf[CarbonTableSplitPartition]
-          logInfo("Input split: " + split.serializableHadoopSplit.value)
-          val blocksID = gernerateBlocksID
-          carbonLoadModel.setBlocksID(blocksID)
-          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          if (carbonLoadModel.isDirectLoad) {
-            model = carbonLoadModel.getCopyWithPartition(
-              split.serializableHadoopSplit.value.getPartition.getUniqueID,
-              split.serializableHadoopSplit.value.getPartition.getFilesPath,
-              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          } else {
-            model = carbonLoadModel.getCopyWithPartition(
-              split.serializableHadoopSplit.value.getPartition.getUniqueID)
-          }
-          partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-          // get this partition data blocks and put it to global static map
-          GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
-          StandardLogService.setThreadName(partitionID, null)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordPartitionBlockMap(
-            partitionID, split.partitionBlocksDetail.length)
-        } else {
-          // for node partition
-          val split = theSplit.asInstanceOf[CarbonNodePartition]
-          logInfo("Input split: " + split.serializableHadoopSplit)
-          logInfo("The Block Count in this node: " + split.nodeBlocksDetail.length)
-          CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordHostBlockMap(
-            split.serializableHadoopSplit, split.nodeBlocksDetail.length)
-          val blocksID = gernerateBlocksID
-          carbonLoadModel.setBlocksID(blocksID)
-          carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-          // set this node blocks info to global static map
-          GraphGenerator.blockInfo.put(blocksID, split.nodeBlocksDetail)
-          if (carbonLoadModel.isDirectLoad) {
-            val filelist: java.util.List[String] = new java.util.ArrayList[String](
-              CarbonCommonConstants.CONSTANT_SIZE_TEN)
-            CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, filelist, ",")
-            model = carbonLoadModel.getCopyWithPartition(partitionID, filelist,
-              carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
-          } else {
-            model = carbonLoadModel.getCopyWithPartition(partitionID)
-          }
-          StandardLogService.setThreadName(blocksID, null)
-        }
-      }
-
-      /**
-       * generate blocks id
-       *
-       * @return
-       */
-      def gernerateBlocksID: String = {
-        if (isTableSplitPartition) {
-          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-          theSplit.asInstanceOf[CarbonTableSplitPartition].serializableHadoopSplit.value
-            .getPartition.getUniqueID + "_" + UUID.randomUUID()
-        } else {
-          carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName + "_" +
-          UUID.randomUUID()
-        }
-      }
-
-      def checkAndLoadAggregationTable: String = {
-        val schema = model.getCarbonDataLoadSchema
-        val aggTables = schema.getCarbonTable.getAggregateTablesName
-        if (null != aggTables && !aggTables.isEmpty) {
-          val details = model.getLoadMetadataDetails.asScala.toArray
-          val newSlice = CarbonCommonConstants.LOAD_FOLDER + loadCount
-          var listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
-          listOfLoadFolders = CarbonLoaderUtil.addNewSliceNameToList(newSlice, listOfLoadFolders)
-          val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
-          var listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
-          listOfAllLoadFolders = CarbonLoaderUtil
-            .addNewSliceNameToList(newSlice, listOfAllLoadFolders)
-          val copyListOfLoadFolders = listOfLoadFolders.asScala.toList
-          val copyListOfUpdatedLoadFolders = listOfUpdatedLoadFolders.asScala.toList
-          loadTableSlices(listOfAllLoadFolders, details)
-          val loadFolders = Array[String]()
-          loadMetadataDetails.setLoadStatus(iterateOverAggTables(aggTables,
-            copyListOfLoadFolders.asJava, copyListOfUpdatedLoadFolders.asJava, loadFolders))
-          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
-            loadMetadataDetails.getLoadStatus)) {
-            // remove the current slice from memory not the table
-            CarbonLoaderUtil
-              .removeSliceFromMemory(model.getDatabaseName, model.getTableName, newSlice)
-            logInfo(s"Aggregate table creation failed")
-          } else {
-            logInfo("Aggregate tables creation successfull")
-          }
-        }
-        loadMetadataDetails.getLoadStatus
-      }
-
-      def loadTableSlices(listOfAllLoadFolders: java.util.List[String],
-          loadMetadataDetails: Array[LoadMetadataDetails]) = {
-        CarbonProperties.getInstance().addProperty("carbon.cache.used", "false")
-        // TODO: Implement it
-      }
-
-      def createManualAggregateTable: String = {
-        val details = model.getLoadMetadataDetails.asScala.toArray
-        val listOfAllLoadFolders = CarbonQueryUtil.getListOfSlices(details)
-        val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
-        val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
-        loadTableSlices(listOfAllLoadFolders, details)
-        val loadFolders = Array[String]()
-        val aggTable = model.getAggTableName
-        loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
-          listOfUpdatedLoadFolders, loadFolders))
-        if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
-          loadMetadataDetails.getLoadStatus)) {
-          logInfo(s"Aggregate table creation failed :: $aggTable")
-        } else {
-          logInfo(s"Aggregate table creation successfull :: $aggTable")
-        }
-        loadMetadataDetails.getLoadStatus
-      }
-
-      def recreateAggregationTableForRetention = {
-        val schema = model.getCarbonDataLoadSchema
-        val aggTables = schema.getCarbonTable.getAggregateTablesName
-        if (null != aggTables && !aggTables.isEmpty) {
-          val details = model.getLoadMetadataDetails.asScala.toArray
-          val listOfLoadFolders = CarbonLoaderUtil.getListOfValidSlices(details)
-          val listOfUpdatedLoadFolders = CarbonLoaderUtil.getListOfUpdatedSlices(details)
-          val listOfAllLoadFolder = CarbonQueryUtil.getListOfSlices(details)
-          loadTableSlices(listOfAllLoadFolder, details)
-          val loadFolders = Array[String]()
-          iterateOverAggTables(aggTables, listOfLoadFolders, listOfUpdatedLoadFolders, loadFolders)
-        }
-      }
-
-      // TODO Aggregate table needs to be handled
-      def iterateOverAggTables(aggTables: java.util.List[String],
-          listOfLoadFolders: java.util.List[String],
-          listOfUpdatedLoadFolders: java.util.List[String],
-          loadFolders: Array[String]): String = {
-        model.setAggLoadRequest(true)
-        aggTables.asScala.foreach { aggTable =>
-          model.setAggTableName(aggTable)
-          loadMetadataDetails.setLoadStatus(loadAggregationTable(listOfLoadFolders,
-            listOfUpdatedLoadFolders, loadFolders))
-          if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(
-            loadMetadataDetails.getLoadStatus)) {
-            logInfo(s"Aggregate table creation failed :: aggTable")
-            return loadMetadataDetails.getLoadStatus
-          }
-        }
-        loadMetadataDetails.getLoadStatus
-      }
-
-      def loadAggregationTable(listOfLoadFolders: java.util.List[String],
-          listOfUpdatedLoadFolders: java.util.List[String],
-          loadFolders: Array[String]): String = {
-        // TODO: Implement it
-        loadMetadataDetails.getLoadStatus
-      }
-
-      var finished = false
-
-      override def hasNext: Boolean = {
-        !finished
-      }
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
-      }
-    }
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    if (isTableSplitPartition) {
-      // for table split partition
-      val theSplit = split.asInstanceOf[CarbonTableSplitPartition]
-      val location = theSplit.serializableHadoopSplit.value.getLocations.asScala
-      location
-    } else {
-      // for node partition
-      val theSplit = split.asInstanceOf[CarbonNodePartition]
-      val firstOptionLocation: Seq[String] = List(theSplit.serializableHadoopSplit)
-      logInfo("Preferred Location for split: " + firstOptionLocation.head)
-      val blockMap = new util.LinkedHashMap[String, Integer]()
-      val tableBlocks = theSplit.blocksDetails
-      tableBlocks.foreach { tableBlock =>
-        tableBlock.getLocations.foreach { location =>
-          if (!firstOptionLocation.exists(location.equalsIgnoreCase)) {
-            val currentCount = blockMap.get(location)
-            if (currentCount == null) {
-              blockMap.put(location, 1)
-            } else {
-              blockMap.put(location, currentCount + 1)
-            }
-          }
-        }
-      }
-
-      val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => {
-        nodeCount1.getValue > nodeCount2.getValue
-      }
-      )
-
-      val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2)
-      firstOptionLocation ++ sortedNodesList
-    }
-  }
-
-}
-
-/**
- * Use this RDD class to load RDD
- *
- * @param sc
- * @param result
- * @param carbonLoadModel
- * @param storePath
- * @param kettleHomePath
- * @param columinar
- * @param loadCount
- * @param tableCreationTime
- * @param schemaLastUpdatedTime
- * @param prev
- * @tparam K
- * @tparam V
- */
-class DataFrameLoaderRDD[K, V](
-    sc: SparkContext,
-    result: DataLoadResult[K, V],
-    carbonLoadModel: CarbonLoadModel,
-    storePath: String,
-    kettleHomePath: String,
-    columinar: Boolean,
-    loadCount: Integer,
-    tableCreationTime: Long,
-    schemaLastUpdatedTime: Long,
-    prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  @DeveloperApi
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val resultIter = new Iterator[(K, V)] {
-      var partitionID = "0"
-      val loadMetadataDetails = new LoadMetadataDetails()
-      var uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE +
-                               theSplit.index
-      try {
-        loadMetadataDetails.setPartitionCount(partitionID)
-        carbonLoadModel.setPartitionId(partitionID)
-        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
-        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
-        val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
-          kettleHomePath, loadCount, loadMetadataDetails)
-        loader.initialize
-        val rddIteratorKey = UUID.randomUUID().toString
-        try {
-          RddInputUtils.put(rddIteratorKey,
-              new PartitionIterator(
-                  firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
-                  carbonLoadModel,
-                  context))
-          carbonLoadModel.setRddIteratorKey(rddIteratorKey)
-          loader.run()
-        } finally {
-          RddInputUtils.remove(rddIteratorKey)
-        }
-      } catch {
-        case e: Exception =>
-          logInfo("DataLoad failure")
-          LOGGER.error(e)
-          throw e
-      }
-
-      var finished = false
-
-      override def hasNext: Boolean = !finished
-
-      override def next(): (K, V) = {
-        finished = true
-        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
-      }
-    }
-    resultIter
-  }
-
-  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
-}
-
-class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
-    carbonLoadModel: CarbonLoadModel,
-    context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
-  def hasNext: Boolean = partitionIter.hasNext
-  def next: JavaRddIterator[Array[String]] = {
-    val value = partitionIter.next
-    new RddIterator(value.rdd.iterator(value.partition, context),
-        carbonLoadModel,
-        context)
-  }
-  def initialize: Unit = {
-    TaskContextUtil.setTaskContext(context)
-  }
-}
-/**
- * This class wrap Scala's Iterator to Java's Iterator.
- * It also convert all columns to string data to use csv data loading flow.
- *
- * @param rddIter
- * @param carbonLoadModel
- * @param context
- */
-class RddIterator(rddIter: Iterator[Row],
-                  carbonLoadModel: CarbonLoadModel,
-                  context: TaskContext) extends JavaRddIterator[Array[String]] {
-
-  val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
-    .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-  val format = new SimpleDateFormat(formatString)
-  val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-  val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-  val serializationNullFormat =
-      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-  def hasNext: Boolean = rddIter.hasNext
-
-  def next: Array[String] = {
-    val row = rddIter.next()
-    val columns = new Array[String](row.length)
-    for (i <- 0 until columns.length) {
-      columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
-          delimiterLevel1, delimiterLevel2, format)
-    }
-    columns
-  }
-
-  def initialize: Unit = {
-    TaskContextUtil.setTaskContext(context)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c30ead7..a750493 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
 
+import com.databricks.spark.csv.newapi.CarbonTextFile
 import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
@@ -32,9 +33,9 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkContext, SparkEnv, SparkException}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel}
 import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.util.{FileUtils, SplitUtils}
+import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
@@ -43,7 +44,6 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType}
 import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.apache.carbondata.processing.etl.DataLoadingException
@@ -51,11 +51,10 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
+import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionCallable, CompactionType}
 import org.apache.carbondata.spark.partition.api.Partition
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil}
-
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil, LoadMetadataUtil}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -164,30 +163,6 @@ object CarbonDataRDDFactory {
     }
   }
 
-  def configSplitMaxSize(context: SparkContext, filePaths: String,
-      hadoopConfiguration: Configuration): Unit = {
-    val defaultParallelism = if (context.defaultParallelism < 1) {
-      1
-    } else {
-      context.defaultParallelism
-    }
-    val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
-    val blockSize =
-      hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
-    LOGGER.info("[Block Distribution]")
-    // calculate new block size to allow use all the parallelism
-    if (spaceConsumed < defaultParallelism * blockSize) {
-      var newSplitSize: Long = spaceConsumed / defaultParallelism
-      if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
-        newSplitSize = CarbonCommonConstants.CARBON_16MB
-      }
-      hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
-      LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
-                  s"defaultParallelism: $defaultParallelism")
-      LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
-    }
-  }
-
   def alterTableForCompaction(sqlContext: SQLContext,
       alterTableModel: AlterTableModel,
       carbonLoadModel: CarbonLoadModel,
@@ -206,11 +181,11 @@ object CarbonDataRDDFactory {
     LOGGER.audit(s"Compaction request received for table " +
                  s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+    val tableCreationTime = CarbonEnv.get.carbonMetastore
       .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      readLoadMetadataDetails(carbonLoadModel, storePath)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
     }
     // reading the start time of data load.
     val loadStartTime = CarbonLoaderUtil.readCurrentTime()
@@ -384,7 +359,7 @@ object CarbonDataRDDFactory {
 
 
       // scan again and determine if anything is there to merge again.
-      readLoadMetadataDetails(carbonLoadModel, storePath)
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       segList = carbonLoadModel.getLoadMetadataDetails
       // in case of major compaction we will scan only once and come out as it will keep
       // on doing major for the new loads also.
@@ -452,7 +427,7 @@ object CarbonDataRDDFactory {
       compactionLock: ICarbonLock): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
-    readLoadMetadataDetails(carbonLoadModel, storePath)
+    CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
     var segList: util.List[LoadMetadataDetails] = carbonLoadModel.getLoadMetadataDetails
 
     // clean up of the stale segments.
@@ -493,9 +468,8 @@ object CarbonDataRDDFactory {
             LOGGER.info("System level compaction lock is enabled.")
             val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
             var tableForCompaction = CarbonCompactionUtil
-              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
-                .tablesMeta.toArray, skipCompactionTables.toList.asJava
-              )
+                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata.tablesMeta.toArray,
+                  skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
                           s"${ tableForCompaction.carbonTable.getDatabaseName }." +
@@ -506,7 +480,7 @@ object CarbonDataRDDFactory {
 
               val newCarbonLoadModel = new CarbonLoadModel()
               prepareCarbonLoadModel(storePath, table, newCarbonLoadModel)
-              val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+              val tableCreationTime = CarbonEnv.get.carbonMetastore
                 .getTableCreationTime(newCarbonLoadModel.getDatabaseName,
                   newCarbonLoadModel.getTableName
                 )
@@ -548,7 +522,7 @@ object CarbonDataRDDFactory {
               }
               // ********* check again for all the tables.
               tableForCompaction = CarbonCompactionUtil
-                .getNextTableToCompact(CarbonEnv.getInstance(sqlContext).carbonCatalog.metadata
+                .getNextTableToCompact(CarbonEnv.get.carbonMetastore.metadata
                   .tablesMeta.toArray, skipCompactionTables.asJava
                 )
             }
@@ -581,7 +555,7 @@ object CarbonDataRDDFactory {
     newCarbonLoadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
     newCarbonLoadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     newCarbonLoadModel.setStorePath(table.getStorePath)
-    readLoadMetadataDetails(newCarbonLoadModel, storePath)
+    CommonUtil.readLoadMetadataDetails(newCarbonLoadModel, storePath)
     val loadStartTime = CarbonLoaderUtil.readCurrentTime()
     newCarbonLoadModel.setFactTimeStamp(loadStartTime)
   }
@@ -604,7 +578,6 @@ object CarbonDataRDDFactory {
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
       kettleHomePath: String,
-      partitioner: Partitioner,
       columinar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
       useKettle: Boolean,
@@ -703,7 +676,7 @@ object CarbonDataRDDFactory {
       deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath,
         isForceDeletion = false)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
-        readLoadMetadataDetails(carbonLoadModel, storePath)
+        CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       }
 
       var currentLoadCount = -1
@@ -741,9 +714,9 @@ object CarbonDataRDDFactory {
       // reading the start time of data load.
       val loadStartTime = CarbonLoaderUtil.readCurrentTime()
       carbonLoadModel.setFactTimeStamp(loadStartTime)
-      val tableCreationTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+      val tableCreationTime = CarbonEnv.get.carbonMetastore
         .getTableCreationTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val schemaLastUpdatedTime = CarbonEnv.getInstance(sqlContext).carbonCatalog
+      val schemaLastUpdatedTime = CarbonEnv.get.carbonMetastore
         .getSchemaLastUpdatedTime(carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
 
       // get partition way from configuration
@@ -776,7 +749,7 @@ object CarbonDataRDDFactory {
                 if (pathBuilder.nonEmpty) {
                   pathBuilder.substring(0, pathBuilder.size - 1)
                 }
-                (split.getPartition.getUniqueID, SplitUtils.getSplits(pathBuilder.toString(),
+                (split.getPartition.getUniqueID, SparkUtil.getSplits(pathBuilder.toString(),
                   sqlContext.sparkContext
                 ))
             }
@@ -795,7 +768,7 @@ object CarbonDataRDDFactory {
                 }
                 pathBuilder.append(split.getPartition.getUniqueID).append("/")
                 (split.getPartition.getUniqueID,
-                  SplitUtils.getSplits(pathBuilder.toString, sqlContext.sparkContext))
+                  SparkUtil.getSplits(pathBuilder.toString, sqlContext.sparkContext))
             }
           }
         } else {
@@ -817,7 +790,7 @@ object CarbonDataRDDFactory {
                org.apache.hadoop.io.compress.DefaultCodec,
                org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
 
-          configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+          CarbonTextFile.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
 
           val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
           inputFormat match {
@@ -883,7 +856,6 @@ object CarbonDataRDDFactory {
             carbonLoadModel,
             storePath,
             kettleHomePath,
-            partitioner,
             columinar,
             currentLoadCount,
             tableCreationTime,
@@ -895,7 +867,6 @@ object CarbonDataRDDFactory {
           status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
             new DataLoadResultImpl(),
             carbonLoadModel,
-            partitioner,
             currentLoadCount,
             blocksGroupBy,
             isTableSplitPartition).collect()
@@ -930,8 +901,7 @@ object CarbonDataRDDFactory {
       }
 
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName,
-        partitioner.partitionCount, currentLoadCount.toString)
+        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       var errorMessage: String = "DataLoad failure"
       var executorMessage: String = ""
@@ -1034,17 +1004,11 @@ object CarbonDataRDDFactory {
 
   }
 
-  def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
-    val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
-    val details = SegmentStatusManager.readLoadMetadata(metadataPath)
-    model.setLoadMetadataDetails(details.toList.asJava)
-  }
-
   def deleteLoadsAndUpdateMetadata(
       carbonLoadModel: CarbonLoadModel,
       table: CarbonTable,
       storePath: String,
-      isForceDeletion: Boolean) {
+      isForceDeletion: Boolean): Unit = {
     if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) {
       val loadMetadataFilePath = CarbonLoaderUtil
         .extractLoadMetadataFileLocation(carbonLoadModel)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
deleted file mode 100644
index 343a602..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.load.LoadMetadataDetails
-import org.apache.carbondata.spark.DeletedLoadResult
-import org.apache.carbondata.spark.load.DeletedLoadMetadata
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDeleteLoadByDateRDD[K, V](
-    sc: SparkContext,
-    result: DeletedLoadResult[K, V],
-    databaseName: String,
-    tableName: String,
-    dateField: String,
-    dateFieldActualName: String,
-    dateValue: String,
-    factTableName: String,
-    dimTableName: String,
-    storePath: String,
-    loadMetadataDetails: List[LoadMetadataDetails])
-  extends RDD[(K, V)](sc, Nil) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map {s =>
-      new CarbonLoadPartition(id, s._2, s._1)
-    }
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
-    new Iterator[(K, V)] {
-      val deletedMetaData = new DeletedLoadMetadata()
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      val partitionID = split.serializableHadoopSplit.value.getPartition.getUniqueID
-
-      // TODO call CARBON delete API
-      logInfo("Applying data retention as per date value " + dateValue)
-      var dateFormat = ""
-      try {
-        dateFormat = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-      } catch {
-        case e: Exception => logInfo("Unable to parse with default time format " + dateValue)
-      }
-      // TODO: Implement it
-      var finished = false
-
-      override def hasNext: Boolean = {
-        finished
-      }
-
-      override def next(): (K, V) = {
-        result.getKey(null, null)
-      }
-    }
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name: " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
deleted file mode 100644
index 26e1abc..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDeleteLoadRDD[V: ClassTag](
-    sc: SparkContext,
-    valueClass: Value[V],
-    loadId: Int,
-    databaseName: String,
-    tableName: String,
-    partitioner: Partitioner)
-  extends RDD[V](sc, Nil) {
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map {f =>
-      new CarbonLoadPartition(id, f._2, f._1)
-    }
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-    val iter = new Iterator[V] {
-      val split = theSplit.asInstanceOf[CarbonLoadPartition]
-      logInfo("Input split: " + split.serializableHadoopSplit.value)
-      // TODO call CARBON delete API
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        valueClass.getValue(null)
-      }
-
-    }
-    logInfo("********Deleting***************")
-    iter
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val theSplit = split.asInstanceOf[CarbonLoadPartition]
-    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
-    logInfo("Host Name: " + s.head + s.length)
-    s
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
deleted file mode 100644
index 47689bd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark.{Partition, SparkContext, TaskContext}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-
-import org.apache.carbondata.spark.Value
-import org.apache.carbondata.spark.util.CarbonQueryUtil
-
-class CarbonDropTableRDD[V: ClassTag](
-    sc: SparkContext,
-    valueClass: Value[V],
-    databaseName: String,
-    tableName: String)
-  extends RDD[V](sc, Nil) {
-
-  sc.setLocalProperty("spark.scheduler.pool", "DDL")
-
-  override def getPartitions: Array[Partition] = {
-    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
-    splits.zipWithIndex.map { s =>
-      new CarbonLoadPartition(id, s._2, s._1)
-    }
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
-
-    val iter = new Iterator[V] {
-      // TODO: Clear Btree from memory
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): V = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        valueClass.getValue(null)
-      }
-    }
-    iter
-  }
-}
-