You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:05 UTC

[08/16] carbondata git commit: [CARBONDATA-2168] Support global sort for standard hive partitioning

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 4806f9f..9ea58a9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.util
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
@@ -59,10 +61,12 @@ class CarbonDropPartitionRDD(
     val iter = new Iterator[String] {
       val split = theSplit.asInstanceOf[CarbonDropPartition]
       logInfo("Dropping partition information from : " + split.segmentPath)
-
+      partitions.toList.asJava
+      val partitionList = new util.ArrayList[util.List[String]]()
+      partitionList.add(partitions.toList.asJava)
       new PartitionMapFileStore().dropPartitions(
         split.segmentPath,
-        partitions.toList.asJava,
+        partitionList,
         uniqueId,
         partialMatch)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 748945d..73be3c8 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -26,14 +26,18 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.command.DataTypeInfo
 import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
+import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
+import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
-import org.apache.carbondata.core.util.{ByteUtil, CarbonSessionInfo}
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
+import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
+import org.apache.carbondata.core.util.{CarbonSessionInfo, DataTypeUtil}
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
@@ -196,21 +200,85 @@ object CarbonScalaUtil {
   /**
    * Converts incoming value to String after converting data as per the data type.
    * @param value Input value to convert
-   * @param dataType Datatype to convert and then convert to String
-   * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
-   * @param dateFormat DataFormat to convert in case of DateType datatype
+   * @param column column which it value belongs to
    * @return converted String
    */
-  def convertToCarbonFormat(value: String,
-      dataType: DataType,
-      timeStampFormat: SimpleDateFormat,
-      dateFormat: SimpleDateFormat): String = {
+  def convertToCarbonFormat(
+      value: String,
+      column: CarbonColumn,
+      forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary],
+      table: CarbonTable): String = {
+    if (column.hasEncoding(Encoding.DICTIONARY)) {
+      if (column.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) {
+          val time = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+            column.getDataType,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+          ).getValueFromSurrogate(value.toInt).toString
+          return DateTimeUtils.timestampToString(time.toLong * 1000)
+        } else if (column.getDataType.equals(CarbonDataTypes.DATE)) {
+          val date = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+            column.getDataType,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
+          ).getValueFromSurrogate(value.toInt).toString
+          return DateTimeUtils.dateToString(date.toInt)
+        }
+      }
+      val dictionaryPath =
+        table.getTableInfo.getFactTable.getTableProperties.get(
+          CarbonCommonConstants.DICTIONARY_PATH)
+      val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
+        table.getAbsoluteTableIdentifier,
+        column.getColumnIdentifier, column.getDataType,
+        dictionaryPath)
+      return forwardDictionaryCache.get(
+        dictionaryColumnUniqueIdentifier).getDictionaryValueForKey(value.toInt)
+    }
     try {
-      dataType match {
-        case TimestampType =>
-          timeStampFormat.format(DateTimeUtils.stringToTime(value))
-        case DateType =>
-          dateFormat.format(DateTimeUtils.stringToTime(value))
+      column.getDataType match {
+        case CarbonDataTypes.TIMESTAMP =>
+          DateTimeUtils.timestampToString(value.toLong * 1000)
+        case CarbonDataTypes.DATE =>
+          DateTimeUtils.dateToString(DateTimeUtils.millisToDays(value.toLong))
+        case _ => value
+      }
+    } catch {
+      case e: Exception =>
+        value
+    }
+  }
+
+  /**
+   * Converts incoming value to String after converting data as per the data type.
+   * @param value Input value to convert
+   * @param column column which it value belongs to
+   * @return converted String
+   */
+  def convertStaticPartitions(
+      value: String,
+      column: ColumnSchema,
+      table: CarbonTable): String = {
+    try {
+      if (column.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        if (column.getDataType.equals(CarbonDataTypes.TIMESTAMP)) {
+          return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+            column.getDataType,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
+          ).generateDirectSurrogateKey(value).toString
+        } else if (column.getDataType.equals(CarbonDataTypes.DATE)) {
+          return DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
+            column.getDataType,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
+          ).generateDirectSurrogateKey(value).toString
+        }
+      }
+      column.getDataType match {
+        case CarbonDataTypes.TIMESTAMP =>
+          DataTypeUtil.getDataDataTypeForNoDictionaryColumn(value,
+            column.getDataType,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT).toString
+        case CarbonDataTypes.DATE =>
+          DateTimeUtils.stringToDate(UTF8String.fromString(value)).get.toString
         case _ => value
       }
     } catch {
@@ -229,11 +297,11 @@ object CarbonScalaUtil {
       partitionSpec: Map[String, String],
       table: CarbonTable,
       timeFormat: SimpleDateFormat,
-      dateFormat: SimpleDateFormat,
-      serializationNullFormat: String,
-      badRecordAction: String,
-      isEmptyBadRecord: Boolean): Map[String, String] = {
+      dateFormat: SimpleDateFormat): Map[String, String] = {
     val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
+    val cacheProvider: CacheProvider = CacheProvider.getInstance
+    val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+      cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
     partitionSpec.map{ case (col, pvalue) =>
       // replace special string with empty value.
       val value = if (pvalue == null) {
@@ -246,17 +314,15 @@ object CarbonScalaUtil {
       val carbonColumn = table.getColumnByName(table.getTableName, col.toLowerCase)
       val dataType = CarbonScalaUtil.convertCarbonToSparkDataType(carbonColumn.getDataType)
       try {
-        if (isEmptyBadRecord && value.length == 0 &&
-            badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString) &&
-            dataType != StringType) {
-          (col, hiveignorepartition)
-        } else if (!isEmptyBadRecord && value.length == 0 && dataType != StringType) {
-          (col, hivedefaultpartition)
-        } else if (value.equals(hivedefaultpartition)) {
+        if (value.equals(hivedefaultpartition)) {
           (col, value)
         } else {
-          val convertedString = CarbonScalaUtil.convertToString(
-            value, dataType, timeFormat, dateFormat, serializationNullFormat)
+          val convertedString =
+            CarbonScalaUtil.convertToCarbonFormat(
+              value,
+              carbonColumn,
+              forwardDictionaryCache,
+              table)
           if (convertedString == null) {
             (col, hivedefaultpartition)
           } else {
@@ -265,13 +331,7 @@ object CarbonScalaUtil {
         }
       } catch {
         case e: Exception =>
-          // If it is bad record ignore case then add with special string so that it will be
-          // filtered after this.
-          if (badRecordAction.equalsIgnoreCase(LoggerAction.IGNORE.toString)) {
-            (col, hiveignorepartition)
-          } else {
-            (col, hivedefaultpartition)
-          }
+          (col, value)
       }
     }
   }
@@ -306,10 +366,7 @@ object CarbonScalaUtil {
           f.spec,
           table,
           timeFormat,
-          dateFormat,
-          serializeFormat,
-          badRecordAction,
-          isEmptyBadRecord)
+          dateFormat)
       f.copy(spec = changedSpec)
     }.filterNot{ p =>
       // Filter the special bad record ignore case string

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7d49c11..9bdaddb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
@@ -32,15 +33,15 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, GenericInternalRow}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Expression, GenericInternalRow, SortOrder}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Sort}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{AtomicRunnableCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, FindDataSourceTable, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
@@ -55,7 +56,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.events.exception.PreEventException
@@ -65,12 +66,14 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions
+import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -505,7 +508,7 @@ case class CarbonLoadDataCommand(
       carbonLoadModel: CarbonLoadModel,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame],
-      operationContext: OperationContext) = {
+      operationContext: OperationContext): Unit = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
     val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
@@ -544,17 +547,76 @@ case class CarbonLoadDataCommand(
       CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
       isEmptyBadRecord)
     CarbonSession.threadSet("partition.operationcontext", operationContext)
+    // input data from csv files. Convert to logical plan
+    val allCols = new ArrayBuffer[String]()
+    allCols ++= table.getAllDimensions.asScala.map(_.getColName)
+    allCols ++= table.getAllMeasures.asScala.map(_.getColName)
+    var attributes =
+      StructType(allCols.map(StructField(_, StringType))).toAttributes
+
+    var partitionsLen = 0
+    val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
+    def transformQuery(rdd: RDD[Row], isDataFrame: Boolean) = {
+      val updatedRdd = convertData(rdd, sparkSession, carbonLoadModel, isDataFrame)
+      val catalogAttributes = catalogTable.schema.toAttributes
+      attributes = attributes.map(a => {
+        catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
+      })
+      attributes = attributes.map { attr =>
+        val column = table.getColumnByName(table.getTableName, attr.name)
+        if (column.hasEncoding(Encoding.DICTIONARY)) {
+          AttributeReference(
+            attr.name,
+            IntegerType,
+            attr.nullable,
+            attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+        } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
+          AttributeReference(
+            attr.name,
+            LongType,
+            attr.nullable,
+            attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+        } else {
+          attr
+        }
+      }
+      // Only select the required columns
+      val output = if (partition.nonEmpty) {
+        val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
+        catalogTable.schema.map { attr =>
+          attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+        }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
+      } else {
+        catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+      }
+      partitionsLen = rdd.partitions.length
+      val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession))
+      if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
+        val sortColumns = table.getSortColumns(table.getTableName)
+        Sort(output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)),
+          true,
+          child)
+      } else {
+        child
+      }
+    }
+
     try {
       val query: LogicalPlan = if (dataFrame.isDefined) {
         val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
         val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-        val attributes =
+        val dfAttributes =
           StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
-        val len = attributes.length
+        val partitionValues = if (partition.nonEmpty) {
+          partition.values.filter(_.nonEmpty).map(_.get).toArray
+        } else {
+          Array[String]()
+        }
+        val len = dfAttributes.length
         val rdd = dataFrame.get.rdd.map { f =>
           val data = new Array[Any](len)
           var i = 0
-          while (i < len) {
+          while (i < f.length) {
             data(i) =
               UTF8String.fromString(
                 CarbonScalaUtil.getString(f.get(i),
@@ -565,20 +627,32 @@ case class CarbonLoadDataCommand(
                   dateFormat))
             i = i + 1
           }
-          InternalRow.fromSeq(data)
+          if (partitionValues.length > 0) {
+            var j = 0
+            while (i < len) {
+              data(i) = UTF8String.fromString(partitionValues(j))
+              j = j + 1
+              i = i + 1
+            }
+          }
+          Row.fromSeq(data)
         }
-        if (updateModel.isDefined) {
+        val transRdd = if (updateModel.isDefined) {
           // Get the updated query plan in case of update scenario
-          getLogicalQueryForUpdate(sparkSession, catalogTable, attributes, rdd)
+          Dataset.ofRows(
+            sparkSession,
+            getLogicalQueryForUpdate(
+              sparkSession,
+              catalogTable,
+              dfAttributes,
+              rdd.map(row => InternalRow.fromSeq(row.toSeq)),
+              carbonLoadModel)).rdd
         } else {
-          LogicalRDD(attributes, rdd)(sparkSession)
+          rdd
         }
-
+        transformQuery(transRdd, true)
       } else {
-        // input data from csv files. Convert to logical plan
-        val attributes =
-          StructType(carbonLoadModel.getCsvHeaderColumns.map(
-            StructField(_, StringType))).toAttributes
+
         val rowDataTypes = attributes.map { attribute =>
           catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
             case Some(attr) => attr.dataType
@@ -592,41 +666,12 @@ case class CarbonLoadDataCommand(
             case _ => false
           }
         }
-        val len = rowDataTypes.length
-        var rdd =
-          DataLoadingUtil.csvFileScanRDD(
-            sparkSession,
-            model = carbonLoadModel,
-            hadoopConf)
-            .map { row =>
-              val data = new Array[Any](len)
-              var i = 0
-              val input = row.asInstanceOf[GenericInternalRow].values.asInstanceOf[Array[String]]
-              val inputLen = Math.min(input.length, len)
-              while (i < inputLen) {
-                data(i) = UTF8String.fromString(input(i))
-                // If partition column then update empty value with special string otherwise spark
-                // makes it as null so we cannot internally handle badrecords.
-                if (partitionColumns(i)) {
-                  if (input(i) != null && input(i).isEmpty) {
-                    data(i) = UTF8String.fromString(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
-                  }
-                }
-                i = i + 1
-              }
-              InternalRow.fromSeq(data)
-
-          }
-        // Only select the required columns
-        val output = if (partition.nonEmpty) {
-          val lowerCasePartition = partition.map{case(key, value) => (key.toLowerCase, value)}
-          catalogTable.schema.map { attr =>
-            attributes.find(_.name.equalsIgnoreCase(attr.name)).get
-          }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
-        } else {
-          catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
-        }
-        Project(output, LogicalRDD(attributes, rdd)(sparkSession))
+        val columnCount = carbonLoadModel.getCsvHeaderColumns.length
+        var rdd = DataLoadingUtil.csvFileScanRDD(
+          sparkSession,
+          model = carbonLoadModel,
+          hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
+        transformQuery(rdd.asInstanceOf[RDD[Row]], false)
       }
       val convertRelation = convertToLogicalRelation(
         catalogTable,
@@ -635,24 +680,29 @@ case class CarbonLoadDataCommand(
         carbonLoadModel,
         sparkSession,
         operationContext)
+      val logicalPlan = if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
+        var numPartitions =
+          CarbonDataProcessorUtil.getGlobalSortPartitions(carbonLoadModel.getGlobalSortPartitions)
+        if (numPartitions <= 0) {
+          numPartitions = partitionsLen
+        }
+        if (numPartitions > 0) {
+          Dataset.ofRows(sparkSession, query).repartition(numPartitions).logicalPlan
+        } else {
+          query
+        }
+      } else {
+        query
+      }
+
       val convertedPlan =
         CarbonReflectionUtils.getInsertIntoCommand(
           table = convertRelation,
           partition = partition,
-          query = query,
+          query = logicalPlan,
           overwrite = false,
           ifPartitionNotExists = false)
-      if (isOverwriteTable && partition.nonEmpty) {
-        overwritePartition(
-          sparkSession,
-          table,
-          convertedPlan,
-          serializationNullFormat,
-          badRecordAction,
-          isEmptyBadRecord.toBoolean)
-      } else {
-        Dataset.ofRows(sparkSession, convertedPlan)
-      }
+      Dataset.ofRows(sparkSession, convertedPlan)
     } finally {
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT)
@@ -660,6 +710,14 @@ case class CarbonLoadDataCommand(
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
       CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
       CarbonSession.threadUnset("partition.operationcontext")
+      if (isOverwriteTable) {
+        DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
+        // Clean the overwriting segments if any.
+        new PartitionMapFileStore().cleanSegments(
+          table,
+          CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
+          false)
+      }
     }
     try {
       // Trigger auto compaction
@@ -676,6 +734,48 @@ case class CarbonLoadDataCommand(
     }
   }
 
+  private def convertData(
+      originRDD: RDD[Row],
+      sparkSession: SparkSession,
+      model: CarbonLoadModel,
+      isDataFrame: Boolean): RDD[InternalRow] = {
+    model.setPartitionId("0")
+    val sc = sparkSession.sparkContext
+    val modelBroadcast = sc.broadcast(model)
+    val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
+
+    val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
+    // 1. Input
+    var convertRDD =
+      if (isDataFrame) {
+        originRDD.mapPartitions{rows =>
+          DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
+        }
+      } else {
+        originRDD.map{row =>
+          val array = new Array[AnyRef](row.length)
+          var i = 0
+          while (i < array.length) {
+            array(i) = row.get(i).asInstanceOf[AnyRef]
+            i = i + 1
+          }
+          array
+        }
+      }
+    val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) =>
+        DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
+        DataLoadProcessorStepOnSpark.inputAndconvertFunc(
+          rows,
+          index,
+          modelBroadcast,
+          partialSuccessAccum,
+          inputStepRowCounter,
+          keepActualData = true)
+      }.filter(_ != null).map(row => InternalRow.fromSeq(row.getData))
+
+    finalRDD
+  }
+
   /**
    * Create the logical plan for update scenario. Here we should drop the segmentid column from the
    * input rdd.
@@ -684,7 +784,8 @@ case class CarbonLoadDataCommand(
       sparkSession: SparkSession,
       catalogTable: CatalogTable,
       attributes: Seq[AttributeReference],
-      rdd: RDD[InternalRow]): LogicalPlan = {
+      rdd: RDD[InternalRow],
+      carbonLoadModel: CarbonLoadModel): LogicalPlan = {
     sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
     // In case of update, we don't need the segmrntid column in case of partitioning
     val dropAttributes = attributes.dropRight(1)
@@ -698,6 +799,8 @@ case class CarbonLoadDataCommand(
         }
       }.get
     }
+    carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(","))
+    carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
     Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
   }
 
@@ -709,7 +812,16 @@ case class CarbonLoadDataCommand(
       sparkSession: SparkSession,
       operationContext: OperationContext): LogicalRelation = {
     val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-    val metastoreSchema = StructType(catalogTable.schema.fields.map(_.copy(dataType = StringType)))
+    val metastoreSchema = StructType(catalogTable.schema.fields.map{f =>
+      val column = table.getColumnByName(table.getTableName, f.name)
+      if (column.hasEncoding(Encoding.DICTIONARY)) {
+        f.copy(dataType = IntegerType)
+      } else if (f.dataType == TimestampType || f.dataType == DateType) {
+        f.copy(dataType = LongType)
+      } else {
+        f
+      }
+    })
     val lazyPruningEnabled = sparkSession.sqlContext.conf.manageFilesourcePartitions
     val catalog = new CatalogFileIndex(
       sparkSession, catalogTable, sizeInBytes)
@@ -718,20 +830,18 @@ case class CarbonLoadDataCommand(
     } else {
       catalog.filterPartitions(Nil) // materialize all the partitions in memory
     }
-    val partitionSchema =
+    var partitionSchema =
       StructType(table.getPartitionInfo(table.getTableName).getColumnSchemaList.asScala.map(field =>
         metastoreSchema.fields.find(_.name.equalsIgnoreCase(field.getColumnName))).map(_.get))
-    val overWriteLocal = if (overWrite && partition.nonEmpty) {
-      false
-    } else {
-      overWrite
-    }
     val dataSchema =
       StructType(metastoreSchema
-        .filterNot(field => partitionSchema.contains(field.name)))
+        .filterNot(field => partitionSchema.contains(field)))
+    if (partition.nonEmpty) {
+      partitionSchema = StructType(partitionSchema.fields.map(_.copy(dataType = StringType)))
+    }
     val options = new mutable.HashMap[String, String]()
     options ++= catalogTable.storage.properties
-    options += (("overwrite", overWriteLocal.toString))
+    options += (("overwrite", overWrite.toString))
     options += (("onepass", loadModel.getUseOnePass.toString))
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
@@ -761,108 +871,6 @@ case class CarbonLoadDataCommand(
       Some(catalogTable))
   }
 
-  /**
-   * Overwrite the partition data if static partitions are specified.
-   * @param sparkSession
-   * @param table
-   * @param logicalPlan
-   */
-  private def overwritePartition(
-      sparkSession: SparkSession,
-      table: CarbonTable,
-      logicalPlan: LogicalPlan,
-      serializationNullFormat: String,
-      badRecordAction: String,
-      isEmptyBadRecord: Boolean): Unit = {
-    val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
-
-    // Update the partitions as per the datatype expect for time and datetype as we
-    // expect user provides the format in standard spark/hive formats.
-    val updatedPartitions = CarbonScalaUtil.updatePartitions(
-      partition.filter(_._2.isDefined).map(f => (f._1, f._2.get)),
-      table,
-      timeFormat = null,
-      dateFormat = null,
-      serializationNullFormat,
-      badRecordAction,
-      isEmptyBadRecord)
-    val existingPartitions = sparkSession.sessionState.catalog.listPartitions(
-      identifier,
-      Some(updatedPartitions))
-    val partitionNames = existingPartitions.toList.flatMap { partition =>
-      partition.spec.seq.map{case (column, value) => column + "=" + value}
-    }.toSet
-    val uniqueId = System.currentTimeMillis().toString
-    val segments = new SegmentStatusManager(
-      table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
-    // If any existing partitions need to be overwritten then drop from partitionmap
-    if (partitionNames.nonEmpty) {
-      try {
-        // First drop the partitions from partition mapper files of each segment
-        new CarbonDropPartitionRDD(
-          sparkSession.sparkContext,
-          table.getTablePath,
-          segments.asScala,
-          partitionNames.toSeq,
-          uniqueId,
-          partialMatch = false).collect()
-      } catch {
-        case e: Exception =>
-          // roll back the drop partitions from carbon store
-          new CarbonDropPartitionCommitRDD(
-            sparkSession.sparkContext,
-            table.getTablePath,
-            segments.asScala,
-            success = false,
-            uniqueId,
-            partitionNames.toSeq).collect()
-          throw e
-      }
-
-      try {
-        Dataset.ofRows(sparkSession, logicalPlan)
-      } catch {
-        case e: Exception =>
-          // roll back the drop partitions from carbon store
-          new CarbonDropPartitionCommitRDD(
-            sparkSession.sparkContext,
-            table.getTablePath,
-            segments.asScala,
-            success = false,
-            uniqueId,
-            partitionNames.toSeq).collect()
-          throw e
-      }
-      // Commit the removed partitions in carbon store.
-      new CarbonDropPartitionCommitRDD(
-        sparkSession.sparkContext,
-        table.getTablePath,
-        segments.asScala,
-        success = true,
-        uniqueId,
-        partitionNames.toSeq).collect()
-      // get valid segments
-      val validsegments =
-        new SegmentStatusManager(
-          table.getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments
-      // Update the loadstatus with update time to clear cache from driver.
-      CarbonUpdateUtil.updateTableMetadataStatus(
-        new util.HashSet[String](validsegments),
-        table,
-        uniqueId,
-        true,
-        new util.ArrayList[String])
-      DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
-      // Clean the overwriting segments if any.
-      new PartitionMapFileStore().cleanSegments(
-        table,
-        CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
-        false)
-    } else {
-      // Otherwise its a normal load
-      Dataset.ofRows(sparkSession, logicalPlan)
-    }
-  }
 
   def getDataFrameWithTupleID(): DataFrame = {
     val fields = dataFrame.get.schema.fields

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 17749c8..d2c691b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -36,14 +36,17 @@ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.DataSourceRegister
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -110,6 +113,7 @@ with Serializable {
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
+    model.setPartitionLoad(true)
     // Set the update timestamp if user sets in case of update query. It needs to be updated
     // in load status update time
     val updateTimeStamp = options.get("updatetimestamp")
@@ -231,7 +235,9 @@ private class CarbonOutputWriter(path: String,
     fieldTypes: Seq[DataType],
     taskNo : String)
   extends OutputWriter with AbstractCarbonOutputWriter {
-  val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
+  val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+  val partitions =
+    getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
   val staticPartition: util.HashMap[String, Boolean] = {
     val staticPart = context.getConfiguration.get("carbon.staticpartition")
     if (staticPart != null) {
@@ -272,24 +278,42 @@ private class CarbonOutputWriter(path: String,
       val formattedPartitions = updatedPartitions.map {case (col, value) =>
         // Only convert the static partitions to the carbon format and use it while loading data
         // to carbon.
-        if (staticPartition.asScala.getOrElse(col, false)) {
-          (col, CarbonScalaUtil.convertToCarbonFormat(value,
-            CarbonScalaUtil.convertCarbonToSparkDataType(
-              table.getColumnByName(table.getTableName, col).getDataType),
-            timeFormat,
-            dateFormat))
-        } else {
-          (col, value)
-        }
+        (col, value)
       }
-      (formattedPartitions, formattedPartitions.map(_._2))
+      (formattedPartitions, updatePartitions(formattedPartitions.map(_._2)))
     } else {
-      (updatedPartitions, updatedPartitions.map(_._2))
+      (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
     }
   } else {
     (Map.empty[String, String].toArray, Array.empty)
   }
-  val writable = new StringArrayWritable()
+
+  val writable = new ObjectArrayWritable
+
+  private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
+    model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+      .getColumnSchemaList.asScala.zipWithIndex.map { case (col, index) =>
+
+      val dataType = if (col.hasEncoding(Encoding.DICTIONARY)) {
+        DataTypes.INT
+      } else if (col.getDataType.equals(DataTypes.TIMESTAMP) ||
+                         col.getDataType.equals(DataTypes.DATE)) {
+        DataTypes.LONG
+      } else {
+        col.getDataType
+      }
+      if (staticPartition != null) {
+        DataTypeUtil.getDataBasedOnDataType(
+          CarbonScalaUtil.convertStaticPartitions(
+            partitionData(index),
+            col,
+            model.getCarbonDataLoadSchema.getCarbonTable),
+          dataType)
+      } else {
+        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType)
+      }
+    }.toArray
+  }
 
   private val recordWriter: CarbonRecordWriter = {
     context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
@@ -302,11 +326,18 @@ private class CarbonOutputWriter(path: String,
 
   // TODO Implement writesupport interface to support writing Row directly to recordwriter
   def writeCarbon(row: InternalRow): Unit = {
-    val data = new Array[String](fieldTypes.length + partitionData.length)
+    val data = new Array[AnyRef](fieldTypes.length + partitionData.length)
     var i = 0
     while (i < fieldTypes.length) {
       if (!row.isNullAt(i)) {
-        data(i) = row.getString(i)
+        fieldTypes(i) match {
+          case StringType =>
+            data(i) = row.getString(i)
+          case d: DecimalType =>
+            data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+          case other =>
+            data(i) = row.get(i, other)
+        }
       }
       i += 1
     }
@@ -349,10 +380,7 @@ private class CarbonOutputWriter(path: String,
         updatedPartitions.toMap,
         table,
         timeFormat,
-        dateFormat,
-        serializeFormat,
-        badRecordAction,
-        isEmptyBadRecord)
+        dateFormat)
     formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
     new PartitionMapFileStore().writePartitionMapFile(
       segmentPath,
@@ -360,10 +388,13 @@ private class CarbonOutputWriter(path: String,
       partitonList)
   }
 
-  def getPartitionsFromPath(path: String, attemptContext: TaskAttemptContext): Array[String] = {
+  def getPartitionsFromPath(
+      path: String,
+      attemptContext: TaskAttemptContext,
+      model: CarbonLoadModel): Array[String] = {
     var attemptId = attemptContext.getTaskAttemptID.toString + "/"
     if (path.indexOf(attemptId) <= 0) {
-      val model = CarbonTableOutputFormat.getLoadModel(attemptContext.getConfiguration)
+
       attemptId = model.getTableName + "/"
     }
     val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
index fb78deb..dc2fbbb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataField.java
@@ -37,6 +37,8 @@ public class DataField implements Serializable {
 
   private String timestampFormat;
 
+  private boolean useActualData;
+
   public boolean hasDictionaryEncoding() {
     return column.hasEncoding(Encoding.DICTIONARY);
   }
@@ -60,4 +62,12 @@ public class DataField implements Serializable {
   public void setTimestampFormat(String timestampFormat) {
     this.timestampFormat = timestampFormat;
   }
+
+  public boolean isUseActualData() {
+    return useActualData;
+  }
+
+  public void setUseActualData(boolean useActualData) {
+    this.useActualData = useActualData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f7eff81..ba24d41 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -41,6 +41,7 @@ import org.apache.carbondata.processing.loading.steps.DataConverterProcessorStep
 import org.apache.carbondata.processing.loading.steps.DataConverterProcessorWithBucketingStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataWriterBatchProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl;
+import org.apache.carbondata.processing.loading.steps.InputProcessorStepForPartitionImpl;
 import org.apache.carbondata.processing.loading.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.loading.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
@@ -62,6 +63,8 @@ public final class DataLoadProcessBuilder {
       return buildInternalForBucketing(inputIterators, configuration);
     } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
       return buildInternalForBatchSort(inputIterators, configuration);
+    } else if (loadModel.isPartitionLoad()) {
+      return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
     } else {
       return buildInternal(inputIterators, configuration);
     }
@@ -96,6 +99,32 @@ public final class DataLoadProcessBuilder {
     return new CarbonRowDataWriterProcessorStepImpl(configuration, converterProcessorStep);
   }
 
+  /**
+   * Build pipe line for partition load
+   */
+  private AbstractDataLoadProcessorStep buildInternalForPartitionLoad(
+      CarbonIterator[] inputIterators, CarbonDataLoadConfiguration configuration,
+      SortScopeOptions.SortScope sortScope) {
+    // Wraps with dummy processor.
+    AbstractDataLoadProcessorStep inputProcessorStep =
+        new InputProcessorStepForPartitionImpl(configuration, inputIterators);
+    if (sortScope.equals(SortScopeOptions.SortScope.LOCAL_SORT)) {
+      AbstractDataLoadProcessorStep sortProcessorStep =
+          new SortProcessorStepImpl(configuration, inputProcessorStep);
+      //  Writes the sorted data in carbondata format.
+      return new DataWriterProcessorStepImpl(configuration, sortProcessorStep);
+    } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
+      //  Sorts the data by SortColumn or not
+      AbstractDataLoadProcessorStep sortProcessorStep =
+          new SortProcessorStepImpl(configuration, inputProcessorStep);
+      // Writes the sorted data in carbondata format.
+      return new DataWriterBatchProcessorStepImpl(configuration, sortProcessorStep);
+    } else {
+      // In all other cases like global sort and no sort uses this step
+      return new CarbonRowDataWriterProcessorStepImpl(configuration, inputProcessorStep);
+    }
+  }
+
   private AbstractDataLoadProcessorStep buildInternalForBatchSort(CarbonIterator[] inputIterators,
       CarbonDataLoadConfiguration configuration) {
     // 1. Reads the data input iterators and parses the data.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 2d70f03..85eb19b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -47,6 +47,8 @@ public class MeasureFieldConverterImpl implements FieldConverter {
 
   private boolean isEmptyBadRecord;
 
+  private DataField dataField;
+
   public MeasureFieldConverterImpl(DataField dataField, String nullformat, int index,
       boolean isEmptyBadRecord) {
     this.dataType = dataField.getColumn().getDataType();
@@ -54,6 +56,7 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     this.nullformat = nullformat;
     this.index = index;
     this.isEmptyBadRecord = isEmptyBadRecord;
+    this.dataField = dataField;
   }
 
   @Override
@@ -85,7 +88,11 @@ public class MeasureFieldConverterImpl implements FieldConverter {
       row.update(null, index);
     } else {
       try {
-        output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+        if (dataField.isUseActualData()) {
+          output = DataTypeUtil.getConvertedMeasureValueBasedOnDataType(value, dataType, measure);
+        } else {
+          output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
+        }
         row.update(output, index);
       } catch (NumberFormatException e) {
         LOGGER.warn(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index ced37dd..9cf7fe4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -68,14 +68,25 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
         dateFormat = dataField.getTimestampFormat();
       }
       try {
-        byte[] value = DataTypeUtil
-            .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
-        if (dataType == DataTypes.STRING
-            && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-          throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-              + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+        if (!dataField.isUseActualData()) {
+          byte[] value = DataTypeUtil
+              .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
+          if (dataType == DataTypes.STRING
+              && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+          }
+          row.update(value, index);
+        } else {
+          Object value = DataTypeUtil
+              .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
+          if (dataType == DataTypes.STRING
+              && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
+                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+          }
+          row.update(value, index);
         }
-        row.update(value, index);
       } catch (CarbonDataLoadingException e) {
         throw e;
       } catch (Throwable ex) {
@@ -99,7 +110,9 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
   }
 
   private void updateWithNullValue(CarbonRow row) {
-    if (dataType == DataTypes.STRING) {
+    if (dataField.isUseActualData()) {
+      row.update(null, index);
+    } else if (dataType == DataTypes.STRING) {
       row.update(CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY, index);
     } else {
       row.update(CarbonCommonConstants.EMPTY_BYTE_ARRAY, index);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 66943c8..9229598 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -29,7 +29,7 @@ import org.apache.commons.logging.LogFactory;
  * It is wrapper class to hold the rows in batches when record writer writes the data and allows
  * to iterate on it during data load. It uses blocking queue to coordinate between read and write.
  */
-public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
+public class CarbonOutputIteratorWrapper extends CarbonIterator<Object[]> {
 
   private static final Log LOG = LogFactory.getLog(CarbonOutputIteratorWrapper.class);
 
@@ -46,7 +46,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
 
   private ArrayBlockingQueue<RowBatch> queue = new ArrayBlockingQueue<>(10);
 
-  public void write(String[] row) throws InterruptedException {
+  public void write(Object[] row) throws InterruptedException {
     if (!loadBatch.addRow(row)) {
       loadBatch.readyRead();
       queue.put(loadBatch);
@@ -78,7 +78,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
   }
 
   @Override
-  public String[] next() {
+  public Object[] next() {
     return readBatch.next();
   }
 
@@ -100,16 +100,16 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
     }
   }
 
-  private static class RowBatch extends CarbonIterator<String[]> {
+  private static class RowBatch extends CarbonIterator<Object[]> {
 
     private int counter;
 
-    private String[][] batch;
+    private Object[][] batch;
 
     private int size;
 
     private RowBatch(int size) {
-      batch = new String[size][];
+      batch = new Object[size][];
       this.size = size;
     }
 
@@ -118,7 +118,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
      * @param row
      * @return false if the row cannot be added as batch is full.
      */
-    public boolean addRow(String[] row) {
+    public boolean addRow(Object[] row) {
       batch[counter++] = row;
       return counter < size;
     }
@@ -134,7 +134,7 @@ public class CarbonOutputIteratorWrapper extends CarbonIterator<String[]> {
     }
 
     @Override
-    public String[] next() {
+    public Object[] next() {
       assert (counter < size);
       return batch[counter++];
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index d41455f..4c536ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -188,6 +188,11 @@ public class CarbonLoadModel implements Serializable {
 
   private boolean isAggLoadRequest;
 
+  /**
+   * It directly writes data directly to nosort processor bypassing all other processors.
+   */
+  private boolean isPartitionLoad;
+
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
   }
@@ -401,6 +406,7 @@ public class CarbonLoadModel implements Serializable {
     copy.batchSortSizeInMb = batchSortSizeInMb;
     copy.badRecordsLocation = badRecordsLocation;
     copy.isAggLoadRequest = isAggLoadRequest;
+    copy.isPartitionLoad = isPartitionLoad;
     return copy;
   }
 
@@ -454,6 +460,7 @@ public class CarbonLoadModel implements Serializable {
     copy.batchSortSizeInMb = batchSortSizeInMb;
     copy.isAggLoadRequest = isAggLoadRequest;
     copy.badRecordsLocation = badRecordsLocation;
+    copy.isPartitionLoad = isPartitionLoad;
     return copy;
   }
 
@@ -855,4 +862,12 @@ public class CarbonLoadModel implements Serializable {
   public void setSkipEmptyLine(String skipEmptyLine) {
     this.skipEmptyLine = skipEmptyLine;
   }
+
+  public boolean isPartitionLoad() {
+    return isPartitionLoad;
+  }
+
+  public void setPartitionLoad(boolean partitionLoad) {
+    isPartitionLoad = partitionLoad;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
new file mode 100644
index 0000000..1dc9b27
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepForPartitionImpl.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.loading.steps;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
+import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
+import org.apache.carbondata.processing.loading.DataField;
+import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl;
+import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * It reads data from record reader and sends data to next step.
+ */
+public class InputProcessorStepForPartitionImpl extends AbstractDataLoadProcessorStep {
+
+  private CarbonIterator<Object[]>[] inputIterators;
+
+  private boolean[] noDictionaryMapping;
+
+  private DataType[] dataTypes;
+
+  private int[] orderOfData;
+
+  public InputProcessorStepForPartitionImpl(CarbonDataLoadConfiguration configuration,
+      CarbonIterator<Object[]>[] inputIterators) {
+    super(configuration, null);
+    this.inputIterators = inputIterators;
+  }
+
+  @Override public DataField[] getOutput() {
+    return configuration.getDataFields();
+  }
+
+  @Override public void initialize() throws IOException {
+    super.initialize();
+    // if logger is enabled then raw data will be required.
+    RowConverterImpl rowConverter =
+        new RowConverterImpl(configuration.getDataFields(), configuration, null);
+    rowConverter.initialize();
+    configuration.setCardinalityFinder(rowConverter);
+    noDictionaryMapping =
+        CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields());
+    dataTypes = new DataType[configuration.getDataFields().length];
+    for (int i = 0; i < dataTypes.length; i++) {
+      if (configuration.getDataFields()[i].getColumn().hasEncoding(Encoding.DICTIONARY)) {
+        dataTypes[i] = DataTypes.INT;
+      } else {
+        dataTypes[i] = configuration.getDataFields()[i].getColumn().getDataType();
+      }
+    }
+    orderOfData = arrangeData(configuration.getDataFields(), configuration.getHeader());
+  }
+
+  private int[] arrangeData(DataField[] dataFields, String[] header) {
+    int[] data = new int[dataFields.length];
+    for (int i = 0; i < dataFields.length; i++) {
+      for (int j = 0; j < header.length; j++) {
+        if (dataFields[i].getColumn().getColName().equalsIgnoreCase(header[j])) {
+          data[i] = j;
+          break;
+        }
+      }
+    }
+    return data;
+  }
+
+  @Override public Iterator<CarbonRowBatch>[] execute() {
+    int batchSize = CarbonProperties.getInstance().getBatchSize();
+    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
+    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
+    for (int i = 0; i < outIterators.length; i++) {
+      outIterators[i] =
+          new InputProcessorIterator(readerIterators[i], batchSize, configuration.isPreFetch(),
+              rowCounter, orderOfData, noDictionaryMapping, dataTypes);
+    }
+    return outIterators;
+  }
+
+  /**
+   * Partition input iterators equally as per the number of threads.
+   *
+   * @return
+   */
+  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
+    // Get the number of cores configured in property.
+    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
+    // Get the minimum of number of cores and iterators size to get the number of parallel threads
+    // to be launched.
+    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
+
+    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
+    for (int i = 0; i < parallelThreadNumber; i++) {
+      iterators[i] = new ArrayList<>();
+    }
+    // Equally partition the iterators as per number of threads
+    for (int i = 0; i < inputIterators.length; i++) {
+      iterators[i % parallelThreadNumber].add(inputIterators[i]);
+    }
+    return iterators;
+  }
+
+  @Override protected CarbonRow processRow(CarbonRow row) {
+    return null;
+  }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      for (CarbonIterator inputIterator : inputIterators) {
+        inputIterator.close();
+      }
+    }
+  }
+
+  @Override protected String getStepName() {
+    return "Input Processor";
+  }
+
+  /**
+   * This iterator wraps the list of iterators and it starts iterating the each
+   * iterator of the list one by one. It also parse the data while iterating it.
+   */
+  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
+
+    private List<CarbonIterator<Object[]>> inputIterators;
+
+    private CarbonIterator<Object[]> currentIterator;
+
+    private int counter;
+
+    private int batchSize;
+
+    private boolean nextBatch;
+
+    private boolean firstTime;
+
+    private AtomicLong rowCounter;
+
+    private boolean[] noDictionaryMapping;
+
+    private DataType[] dataTypes;
+
+    private int[] orderOfData;
+
+    public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, int batchSize,
+        boolean preFetch, AtomicLong rowCounter, int[] orderOfData, boolean[] noDictionaryMapping,
+        DataType[] dataTypes) {
+      this.inputIterators = inputIterators;
+      this.batchSize = batchSize;
+      this.counter = 0;
+      // Get the first iterator from the list.
+      currentIterator = inputIterators.get(counter++);
+      this.rowCounter = rowCounter;
+      this.nextBatch = false;
+      this.firstTime = true;
+      this.noDictionaryMapping = noDictionaryMapping;
+      this.dataTypes = dataTypes;
+      this.orderOfData = orderOfData;
+    }
+
+    @Override public boolean hasNext() {
+      return nextBatch || internalHasNext();
+    }
+
+    private boolean internalHasNext() {
+      if (firstTime) {
+        firstTime = false;
+        currentIterator.initialize();
+      }
+      boolean hasNext = currentIterator.hasNext();
+      // If iterator is finished then check for next iterator.
+      if (!hasNext) {
+        currentIterator.close();
+        // Check next iterator is available in the list.
+        if (counter < inputIterators.size()) {
+          // Get the next iterator from the list.
+          currentIterator = inputIterators.get(counter++);
+          // Initialize the new iterator
+          currentIterator.initialize();
+          hasNext = internalHasNext();
+        }
+      }
+      return hasNext;
+    }
+
+    @Override public CarbonRowBatch next() {
+      return getBatch();
+    }
+
+    private CarbonRowBatch getBatch() {
+      // Create batch and fill it.
+      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
+      int count = 0;
+      while (internalHasNext() && count < batchSize) {
+        carbonRowBatch.addRow(new CarbonRow(convertToNoDictionaryToBytes(currentIterator.next())));
+        count++;
+      }
+      rowCounter.getAndAdd(carbonRowBatch.getSize());
+      return carbonRowBatch;
+    }
+
+    private Object[] convertToNoDictionaryToBytes(Object[] data) {
+      Object[] newData = new Object[data.length];
+      for (int i = 0; i < noDictionaryMapping.length; i++) {
+        if (noDictionaryMapping[i]) {
+          newData[i] = DataTypeUtil
+              .getBytesDataDataTypeForNoDictionaryColumn(data[orderOfData[i]], dataTypes[i]);
+        } else {
+          newData[i] = data[orderOfData[i]];
+        }
+      }
+      if (newData.length > noDictionaryMapping.length) {
+        for (int i = noDictionaryMapping.length; i < newData.length; i++) {
+          newData[i] = data[orderOfData[i]];
+        }
+      }
+      //      System.out.println(Arrays.toString(data));
+      return newData;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 06522a4..b795696 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -558,6 +558,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     @Override public Void call() throws Exception {
       try {
         TablePage tablePage = processDataRows(dataRows);
+        dataRows = null;
         tablePage.setIsLastPage(isLastPage);
         // insert the object in array according to sequence number
         int indexInNodeHolderArray = (pageId - 1) % numberOfCores;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/758d03e7/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 2a4cc00..376a546 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -482,22 +482,19 @@ public final class CarbonDataProcessorUtil {
 
   /**
    * Get the number of partitions in global sort
-   * @param configuration
+   * @param globalSortPartitions
    * @return the number of partitions
    */
-  public static int getGlobalSortPartitions(CarbonDataLoadConfiguration configuration) {
+  public static int getGlobalSortPartitions(Object globalSortPartitions) {
     int numPartitions;
     try {
       // First try to get the number from ddl, otherwise get it from carbon properties.
-      if (configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)
-          == null) {
+      if (globalSortPartitions == null) {
         numPartitions = Integer.parseInt(CarbonProperties.getInstance()
           .getProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS,
             CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT));
       } else {
-        numPartitions = Integer.parseInt(
-          configuration.getDataLoadProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS)
-            .toString());
+        numPartitions = Integer.parseInt(globalSortPartitions.toString());
       }
     } catch (Exception e) {
       numPartitions = 0;