You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:49 UTC

[11/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
new file mode 100644
index 0000000..d2e716f
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.Partition;
+import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
+import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
+import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
+import org.apache.carbondata.spark.splits.TableSplit;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This utilty parses the Carbon query plan to actual query model object.
+ */
+public final class CarbonQueryUtil {
+
+  private CarbonQueryUtil() {
+
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
+      CarbonQueryPlan queryPlan) throws IOException {
+
+    //Just create splits depends on locations of region servers
+    List<Partition> allPartitions = null;
+    if (queryPlan == null) {
+      allPartitions =
+          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
+    } else {
+      allPartitions =
+          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+    }
+    TableSplit[] splits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < splits.length; i++) {
+      splits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location = QueryPartitionHelper.getInstance()
+          .getLocation(partition, databaseName, tableName);
+      locations.add(location);
+      splits[i].setPartition(partition);
+      splits[i].setLocations(locations);
+    }
+
+    return splits;
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
+
+    //Just create splits depends on locations of region servers
+    DefaultLoadBalancer loadBalancer = null;
+    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
+    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < tblSplits.length; i++) {
+      tblSplits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location = loadBalancer.getNodeForPartitions(partition);
+      locations.add(location);
+      tblSplits[i].setPartition(partition);
+      tblSplits[i].setLocations(locations);
+    }
+    return tblSplits;
+  }
+
+  /**
+   * split sourcePath by comma
+   */
+  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
+      String separator) {
+    if (StringUtils.isNotEmpty(sourcePath)) {
+      String[] files = sourcePath.split(separator);
+      for (String file : files) {
+        partitionsFiles.add(file);
+      }
+    }
+  }
+
+  private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
+    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
+    List<Partition> partitionList =
+        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
+
+    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
+
+    for (int i = 0; i < files.size(); i++) {
+      partitionFiles.get(i % 1).add(files.get(i));
+    }
+    return partitionList;
+  }
+
+  public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
+    List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    if (null != details) {
+      for (LoadMetadataDetails oneLoad : details) {
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
+          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
+          slices.add(loadName);
+        }
+      }
+    }
+    return slices;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
new file mode 100644
index 0000000..11cf9f8
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Project Name  : Carbon
+ * Module Name   : CARBON Data Processor
+ * Author    : R00903928
+ * Created Date  : 15-Sep-2015
+ * FileName   : LoadMetadataUtil.java
+ * Description   : Kettle step to generate MD Key
+ * Class Version  : 1.0
+ */
+package org.apache.carbondata.spark.util;
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+
+public final class LoadMetadataUtil {
+  private LoadMetadataUtil() {
+
+  }
+
+  public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) {
+    CarbonTable table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
+        .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName());
+
+    String metaDataLocation = table.getMetaDataFilepath();
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    if (details != null && details.length != 0) {
+      for (LoadMetadataDetails oneRow : details) {
+        if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus())
+            || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneRow.getLoadStatus()))
+            && oneRow.getVisibility().equalsIgnoreCase("true")) {
+          return true;
+        }
+      }
+    }
+
+    return false;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala
new file mode 100644
index 0000000..0b652ef
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonAliasDecoderRelation.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
+
+case class CarbonAliasDecoderRelation() {
+
+  val attrMap = new java.util.HashMap[AttributeReferenceWrapper, Attribute]
+
+  def put(key: Attribute, value: Attribute): Unit = {
+    attrMap.put(AttributeReferenceWrapper(key), value)
+  }
+
+  def getOrElse(key: Attribute, default: Attribute): Attribute = {
+    val value = attrMap.get(AttributeReferenceWrapper(key))
+    if (value == null) {
+      default
+    } else {
+      if (value.equals(key)) {
+        value
+      } else {
+        getOrElse(value, value)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
new file mode 100644
index 0000000..ea97bca
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+ /**
+  * Carbon column validator
+  */
+class CarbonColumnValidator extends ColumnValidator {
+  def validateColumns(allColumns: Seq[ColumnSchema]) {
+    allColumns.foreach { columnSchema =>
+      val colWithSameId = allColumns.filter { x =>
+        x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
+      }
+      if (colWithSameId.size > 1) {
+        throw new MalformedCarbonCommandException("Two column can not have same columnId")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
new file mode 100644
index 0000000..2a580dc
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.scan.expression.conditional._
+import org.apache.carbondata.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * All filter conversions are done here.
+ */
+object CarbonFilters {
+
+  /**
+   * Converts data sources filters to carbon filter predicates.
+   */
+  def createCarbonFilter(schema: StructType,
+      predicate: sources.Filter): Option[CarbonExpression] = {
+    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+      predicate match {
+
+        case sources.EqualTo(name, value) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.Not(sources.EqualTo(name, value)) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+
+        case sources.EqualNullSafe(name, value) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.Not(sources.EqualNullSafe(name, value)) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+
+        case sources.GreaterThan(name, value) =>
+          Some(new GreaterThanExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.LessThan(name, value) =>
+          Some(new LessThanExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.GreaterThanOrEqual(name, value) =>
+          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.LessThanOrEqual(name, value) =>
+          Some(new LessThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+
+        case sources.In(name, values) =>
+          Some(new InExpression(getCarbonExpression(name),
+            new ListExpression(
+              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+        case sources.Not(sources.In(name, values)) =>
+          Some(new NotInExpression(getCarbonExpression(name),
+            new ListExpression(
+              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+
+        case sources.And(lhs, rhs) =>
+          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+
+        case sources.Or(lhs, rhs) =>
+          for {
+            lhsFilter <- createFilter(lhs)
+            rhsFilter <- createFilter(rhs)
+          } yield {
+            new OrExpression(lhsFilter, rhsFilter)
+          }
+
+        case _ => None
+      }
+    }
+
+    def getCarbonExpression(name: String) = {
+      new CarbonColumnExpression(name,
+        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+    }
+
+    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+      new CarbonLiteralExpression(value,
+        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+    }
+
+    createFilter(predicate)
+  }
+
+
+  // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
+  // Mostly dimension filters are only pushed down since it is faster in carbon.
+  def selectFilters(filters: Seq[Expression],
+      attrList: java.util.HashSet[AttributeReferenceWrapper],
+      aliasMap: CarbonAliasDecoderRelation): Unit = {
+    def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
+      expr match {
+        case or@ Or(left, right) =>
+
+          val leftFilter = translate(left, or = true)
+          val rightFilter = translate(right, or = true)
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            Some( sources.Or(leftFilter.get, rightFilter.get))
+          } else {
+            or.collect {
+              case attr: AttributeReference =>
+                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+            None
+          }
+
+        case And(left, right) =>
+          (translate(left) ++ translate(right)).reduceOption(sources.And)
+
+        case EqualTo(a: Attribute, Literal(v, t)) =>
+          Some(sources.EqualTo(a.name, v))
+        case EqualTo(l@Literal(v, t), a: Attribute) =>
+          Some(sources.EqualTo(a.name, v))
+        case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.EqualTo(a.name, v))
+        case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.EqualTo(a.name, v))
+
+        case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+        case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+        case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.Not(sources.In(a.name, hSet.toArray)))
+        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.In(a.name, hSet.toArray))
+        case Not(In(Cast(a: Attribute, _), list))
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.Not(sources.In(a.name, hSet.toArray)))
+        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.In(a.name, hSet.toArray))
+
+        case GreaterThan(a: Attribute, Literal(v, t)) =>
+          Some(sources.GreaterThan(a.name, v))
+        case GreaterThan(Literal(v, t), a: Attribute) =>
+          Some(sources.LessThan(a.name, v))
+        case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.GreaterThan(a.name, v))
+        case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.LessThan(a.name, v))
+
+        case LessThan(a: Attribute, Literal(v, t)) =>
+          Some(sources.LessThan(a.name, v))
+        case LessThan(Literal(v, t), a: Attribute) =>
+          Some(sources.GreaterThan(a.name, v))
+        case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.LessThan(a.name, v))
+        case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.GreaterThan(a.name, v))
+
+        case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+        case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+        case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+        case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+
+        case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+        case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+        case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+        case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+
+        case others =>
+          if (!or) {
+            others.collect {
+              case attr: AttributeReference =>
+                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+          }
+          None
+      }
+    }
+    filters.flatMap(translate(_, false)).toArray
+  }
+
+  def processExpression(exprs: Seq[Expression],
+      attributesNeedToDecode: java.util.HashSet[AttributeReference],
+      unprocessedExprs: ArrayBuffer[Expression],
+      carbonTable: CarbonTable): Option[CarbonExpression] = {
+    def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
+      expr match {
+        case or@ Or(left, right) =>
+          val leftFilter = transformExpression(left, true)
+          val rightFilter = transformExpression(right, true)
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            Some(new OrExpression(leftFilter.get, rightFilter.get))
+          } else {
+            or.collect {
+              case attr: AttributeReference => attributesNeedToDecode.add(attr)
+            }
+            unprocessedExprs += or
+            None
+          }
+
+        case And(left, right) =>
+          (transformExpression(left) ++ transformExpression(right)).reduceOption(new
+              AndExpression(_, _))
+
+        case EqualTo(a: Attribute, l@Literal(v, t)) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+        case EqualTo(l@Literal(v, t), a: Attribute) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case IsNotNull(child: Attribute) =>
+            Some(new NotEqualsExpression(transformExpression(child).get,
+             transformExpression(Literal(null)).get, true))
+        case IsNull(child: Attribute) =>
+            Some(new EqualToExpression(transformExpression(child).get,
+             transformExpression(Literal(null)).get, true))
+        case Not(In(a: Attribute, list))
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get,
+              new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+          }
+        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          Some(new InExpression(transformExpression(a).get,
+            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+        case Not(In(Cast(a: Attribute, _), list))
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          /* if any illogical expression comes in NOT IN Filter like
+           NOT IN('scala',NULL) this will be treated as false expression and will
+           always return no result. */
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+              convertToJavaList(list.map(transformExpression(_).get)))))
+          }
+        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          Some(new InExpression(transformExpression(a).get,
+            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+
+        case GreaterThan(a: Attribute, l@Literal(v, t)) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case GreaterThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case GreaterThan(l@Literal(v, t), a: Attribute) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case GreaterThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+
+        case LessThan(a: Attribute, l@Literal(v, t)) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case LessThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case LessThan(l@Literal(v, t), a: Attribute) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case LessThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+
+        case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case GreaterThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case GreaterThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+
+        case LessThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case LessThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case LessThanOrEqual(l@Literal(v, t), a: Attribute) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case LessThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+
+        case AttributeReference(name, dataType, _, _) =>
+          Some(new CarbonColumnExpression(name,
+            CarbonScalaUtil.convertSparkToCarbonDataType(
+              getActualCarbonDataType(name, carbonTable))))
+        case Literal(name, dataType) => Some(new
+            CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+        case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
+        case others =>
+          if (!or) {
+            others.collect {
+              case attr: AttributeReference => attributesNeedToDecode.add(attr)
+            }
+            unprocessedExprs += others
+          }
+          None
+      }
+    }
+    exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
+  }
+  private def isNullLiteral(exp: Expression): Boolean = {
+    if (null != exp
+        &&  exp.isInstanceOf[Literal]
+        && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
+        || (exp.asInstanceOf[Literal].value == null)) {
+      true
+    } else {
+      false
+    }
+  }
+  private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
+    var carbonColumn: CarbonColumn =
+      carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
+    val dataType = if (carbonColumn != null) {
+      carbonColumn.getDataType
+    } else {
+      carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
+      carbonColumn.getDataType match {
+        case DataType.INT => DataType.LONG
+        case DataType.LONG => DataType.LONG
+        case DataType.DECIMAL => DataType.DECIMAL
+        case _ => DataType.DOUBLE
+      }
+    }
+    CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
+  }
+
+  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+  // not able find the classes inside scala list and gives ClassNotFoundException.
+  private def convertToJavaList(
+      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+    val javaList = new java.util.ArrayList[CarbonExpression]()
+    scalaList.foreach(javaList.add)
+    javaList
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
new file mode 100644
index 0000000..a0503c7
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Contains all options for Spark data source
+ */
+class CarbonOption(options: Map[String, String]) {
+  def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
+
+  def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+
+  def tableName: String = options.getOrElse("tableName", "default_table")
+
+  def tablePath: String = s"$dbName/$tableName"
+
+  def tableId: String = options.getOrElse("tableId", "default_table_id")
+
+  def partitionCount: String = options.getOrElse("partitionCount", "1")
+
+  def partitionClass: String = {
+    options.getOrElse("partitionClass",
+      "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
+  }
+
+  def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean
+
+  def compress: Boolean = options.getOrElse("compress", "false").toBoolean
+
+  def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
new file mode 100644
index 0000000..056fcdb
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+
+ /**
+  * Column validator
+  */
+trait ColumnValidator {
+  def validateColumns(columns: Seq[ColumnSchema])
+}
+/**
+ * Dictionary related helper service
+ */
+trait DictionaryDetailService {
+  def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier, storePath: String): DictionaryDetail
+}
+
+/**
+ * Dictionary related detail
+ */
+case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier],
+    dictFilePaths: Array[String], dictFileExists: Array[Boolean])
+
+/**
+ * Factory class
+ */
+object CarbonSparkFactory {
+   /**
+    * @return column validator
+    */
+  def getCarbonColumnValidator(): ColumnValidator = {
+    new CarbonColumnValidator
+  }
+
+  /**
+   * @return dictionary helper
+   */
+  def getDictionaryDetailService(): DictionaryDetailService = {
+    new DictionaryDetailHelper
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
new file mode 100644
index 0000000..e9541de
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable.HashMap
+
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.datastorage.store.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+
+class DictionaryDetailHelper extends DictionaryDetailService {
+  def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier, storePath: String): DictionaryDetail = {
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table)
+    val dictFilePaths = new Array[String](primDimensions.length)
+    val dictFileExists = new Array[Boolean](primDimensions.length)
+    val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
+
+    val fileType = FileFactory.getFileType(dictfolderPath)
+    // Metadata folder
+    val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
+    // need list all dictionary file paths with exists flag
+    val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
+      @Override def accept(pathname: CarbonFile): Boolean = {
+        CarbonTablePath.isDictionaryFile(pathname)
+      }
+    })
+    // 2 put dictionary file names to fileNamesMap
+    val fileNamesMap = new HashMap[String, Int]
+    for (i <- 0 until carbonFiles.length) {
+      fileNamesMap.put(carbonFiles(i).getName, i)
+    }
+    // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
+    primDimensions.zipWithIndex.foreach { f =>
+      columnIdentifier(f._2) = f._1.getColumnIdentifier
+      dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
+      dictFileExists(f._2) =
+        fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
+          case None => false
+          case Some(_) => true
+        }
+    }
+
+    DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
new file mode 100644
index 0000000..254052b
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * It is just Key value class. I don't get any other alternate to make the RDD class to
+ * work with my minimum knowledge in scala.
+ * May be I will remove later once I gain good knowledge :)
+ *
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.load.LoadMetadataDetails
+
+trait Value[V] extends Serializable {
+  def getValue(value: Array[Object]): V
+}
+
+class ValueImpl extends Value[Array[Object]] {
+  override def getValue(value: Array[Object]): Array[Object] = value
+}
+
+trait RawValue[V] extends Serializable {
+  def getValue(value: Array[Any]): V
+}
+
+class RawValueImpl extends RawValue[Array[Any]] {
+  override def getValue(value: Array[Any]): Array[Any] = value
+}
+
+trait DataLoadResult[K, V] extends Serializable {
+  def getKey(key: String, value: LoadMetadataDetails): (K, V)
+}
+
+class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
+  override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
+    (key, value)
+  }
+}
+
+
+trait PartitionResult[K, V] extends Serializable {
+  def getKey(key: Int, value: Boolean): (K, V)
+
+}
+
+class PartitionResultImpl extends PartitionResult[Int, Boolean] {
+  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
+
+trait MergeResult[K, V] extends Serializable {
+  def getKey(key: Int, value: Boolean): (K, V)
+
+}
+
+class MergeResultImpl extends MergeResult[Int, Boolean] {
+  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
+
+trait DeletedLoadResult[K, V] extends Serializable {
+  def getKey(key: String, value: String): (K, V)
+}
+
+class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
+  override def getKey(key: String, value: String): (String, String) = (key, value)
+}
+
+trait RestructureResult[K, V] extends Serializable {
+  def getKey(key: Int, value: Boolean): (K, V)
+}
+
+class RestructureResultImpl extends RestructureResult[Int, Boolean] {
+  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
new file mode 100644
index 0000000..551fc9c
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvReader.scala
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.sql.readers
+
+/**
+ * Parser for parsing lines in bulk. Use this when efficiency is desired.
+ *
+ * @param iter iterator over lines in the file
+ * @param fieldSep the delimiter used to separate fields in a line
+ * @param lineSep the delimiter used to separate lines
+ * @param quote character used to quote fields
+ * @param escape character used to escape the quote character
+ * @param ignoreLeadingSpace ignore white space before a field
+ * @param ignoreTrailingSpace ignore white space after a field
+ * @param headers headers for the columns
+ * @param inputBufSize size of buffer to use for parsing input, tune for performance
+ * @param maxCols maximum number of columns allowed, for safety against bad inputs
+ */
+class CarbonBulkCsvReader (iter: Iterator[String],
+    split: Int,
+    fieldSep: Char = ',',
+    lineSep: String = "\n",
+    quote: Char = '"',
+    escape: Char = '\\',
+    commentMarker: Char = '#',
+    ignoreLeadingSpace: Boolean = true,
+    ignoreTrailingSpace: Boolean = true,
+    headers: Seq[String],
+    inputBufSize: Int = 128,
+    maxCols: Int = 20480)
+  extends CsvReader(fieldSep,
+      lineSep,
+      quote,
+      escape,
+      commentMarker,
+      ignoreLeadingSpace,
+      ignoreTrailingSpace,
+      headers,
+      inputBufSize,
+      maxCols)
+    with Iterator[Array[String]] {
+
+  private val reader = new CarbonStringIteratorReader(iter)
+  parser.beginParsing(reader)
+  private var nextRecord = parser.parseNext()
+
+  /**
+   * get the next parsed line.
+   *
+   * @return array of strings where each string is a field in the CSV record
+   */
+  def next: Array[String] = {
+    val curRecord = nextRecord
+    if(curRecord != null) {
+      nextRecord = parser.parseNext()
+    } else {
+      throw new NoSuchElementException("next record is null")
+    }
+    curRecord
+  }
+
+  def hasNext: Boolean = nextRecord != null
+
+}
+
+/**
+ * A Reader that "reads" from a sequence of lines. Spark's textFile method removes newlines at
+ * end of each line Univocity parser requires a Reader that provides access to the data to be
+ * parsed and needs the newlines to be present
+ * @param iter iterator over RDD[String]
+ */
+private class CarbonStringIteratorReader(val iter: Iterator[String]) extends java.io.Reader {
+
+  private var next: Long = 0
+  private var length: Long = 0  // length of input so far
+  private var start: Long = 0
+  private var str: String = null   // current string from iter
+
+  /**
+   * fetch next string from iter, if done with current one
+   * pretend there is a new line at the end of every string we get from from iter
+   */
+  private def refill(): Unit = {
+    if (length == next) {
+      if (iter.hasNext) {
+        str = iter.next
+        start = length
+        // add a space to every line except the last one to store '\n'
+        if (iter.hasNext) {
+          length += (str.length + 1) // allowance for newline removed by SparkContext.textFile()
+        } else {
+          length += str.length
+        }
+      } else {
+        str = null
+      }
+    }
+  }
+
+  /**
+   * read the next character, if at end of string pretend there is a new line
+   */
+  override def read(): Int = {
+    refill()
+    if(next >= length) {
+      -1
+    } else {
+      val cur = next - start
+      next += 1
+      if (cur == str.length) '\n' else str.charAt(cur.toInt)
+    }
+  }
+
+  /**
+   * read from str into cbuf
+   */
+  def read(cbuf: Array[Char], off: Int, len: Int): Int = {
+    refill()
+    var n = 0
+    if ((off < 0) || (off > cbuf.length) || (len < 0) ||
+      ((off + len) > cbuf.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException()
+    } else if (len == 0) {
+      n = 0
+    } else {
+      if (next >= length) {   // end of input
+        n = -1
+      } else {
+        n = Math.min(length - next, len).toInt // lesser of amount of input available or buf size
+        // add a '\n' to every line except the last one
+        if (n == length - next && iter.hasNext) {
+          str.getChars((next - start).toInt, (next - start + n - 1).toInt, cbuf, off)
+          cbuf(off + n - 1) = '\n'
+        } else {
+          str.getChars((next - start).toInt, (next - start + n).toInt, cbuf, off)
+        }
+        next += n
+        if (n < len) {
+          val m = read(cbuf, off + n, len - n)  // have more space, fetch more input from iter
+          if(m != -1) n += m
+        }
+      }
+    }
+    n
+  }
+
+  override def skip(ns: Long): Long = {
+    throw new IllegalArgumentException("Skip not implemented")
+  }
+
+  override def ready: Boolean = {
+    refill()
+    true
+  }
+
+  override def markSupported: Boolean = false
+
+  override def mark(readAheadLimit: Int): Unit = {
+    throw new IllegalArgumentException("Mark not implemented")
+  }
+
+  override def reset(): Unit = {
+    throw new IllegalArgumentException("Mark and hence reset not implemented")
+  }
+
+  def close(): Unit = { }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
new file mode 100644
index 0000000..e751fe8
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonCsvRelation.scala
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.csv
+
+import java.io.IOException
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.databricks.spark.csv.newapi.CarbonTextFile
+import com.databricks.spark.csv.util._
+import com.databricks.spark.sql.readers._
+import org.apache.commons.csv._
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation, TableScan}
+import org.apache.spark.sql.types._
+import org.slf4j.LoggerFactory
+
+import org.apache.carbondata.processing.etl.DataLoadingException
+
+case class CarbonCsvRelation protected[spark] (
+    location: String,
+    useHeader: Boolean,
+    delimiter: Char,
+    quote: Char,
+    escape: Character,
+    comment: Character,
+    parseMode: String,
+    parserLib: String,
+    ignoreLeadingWhiteSpace: Boolean,
+    ignoreTrailingWhiteSpace: Boolean,
+    userSchema: StructType = null,
+    charset: String = TextFile.DEFAULT_CHARSET.name(),
+    inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext)
+  extends BaseRelation with TableScan with InsertableRelation {
+
+  /**
+   * Limit the number of lines we'll search for a header row that isn't comment-prefixed.
+   */
+  private val MAX_COMMENT_LINES_IN_HEADER = 10
+
+  private val logger = LoggerFactory.getLogger(CarbonCsvRelation.getClass)
+
+  // Parse mode flags
+  if (!ParseModes.isValidMode(parseMode)) {
+    logger.warn(s"$parseMode is not a valid parse mode. Using ${ParseModes.DEFAULT}.")
+  }
+
+  if((ignoreLeadingWhiteSpace || ignoreLeadingWhiteSpace) && ParserLibs.isCommonsLib(parserLib)) {
+    logger.warn(s"Ignore white space options may not work with Commons parserLib option")
+  }
+
+  private val failFast = ParseModes.isFailFastMode(parseMode)
+  private val dropMalformed = ParseModes.isDropMalformedMode(parseMode)
+  private val permissive = ParseModes.isPermissiveMode(parseMode)
+
+  val schema = inferSchema()
+
+  def tokenRdd(header: Array[String]): RDD[Array[String]] = {
+
+    val baseRDD = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
+
+    if(ParserLibs.isUnivocityLib(parserLib)) {
+      univocityParseCSV(baseRDD, header)
+    } else {
+      val csvFormat = CSVFormat.DEFAULT
+        .withDelimiter(delimiter)
+        .withQuote(quote)
+        .withEscape(escape)
+        .withSkipHeaderRecord(false)
+        .withHeader(header: _*)
+        .withCommentMarker(comment)
+
+      // If header is set, make sure firstLine is materialized before sending to executors.
+      val filterLine = if (useHeader) firstLine else null
+
+      baseRDD.mapPartitions { iter =>
+        // When using header, any input line that equals firstLine is assumed to be header
+        val csvIter = if (useHeader) {
+          iter.filter(_ != filterLine)
+        } else {
+          iter
+        }
+        parseCSV(csvIter, csvFormat)
+      }
+    }
+  }
+
+  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
+  def buildScan: RDD[Row] = {
+    val schemaFields = schema.fields
+    tokenRdd(schemaFields.map(_.name)).flatMap{ tokens =>
+
+      if (dropMalformed && schemaFields.length != tokens.size) {
+        logger.warn(s"Dropping malformed line: $tokens")
+        None
+      } else if (failFast && schemaFields.length != tokens.size) {
+        throw new RuntimeException(s"Malformed line in FAILFAST mode: $tokens")
+      } else {
+        var index: Int = 0
+        val rowArray = new Array[Any](schemaFields.length)
+        try {
+          index = 0
+          while (index < schemaFields.length) {
+            val field = schemaFields(index)
+            rowArray(index) = TypeCast.castTo(tokens(index), field.dataType, field.nullable)
+            index = index + 1
+          }
+          Some(Row.fromSeq(rowArray))
+        } catch {
+          case aiob: ArrayIndexOutOfBoundsException if permissive =>
+            (index until schemaFields.length).foreach(ind => rowArray(ind) = null)
+            Some(Row.fromSeq(rowArray))
+        }
+      }
+    }
+  }
+
+  private def inferSchema(): StructType = {
+    if (this.userSchema != null) {
+      userSchema
+    } else {
+      val firstRow = if (ParserLibs.isUnivocityLib(parserLib)) {
+        val escapeVal = if (escape == null) '\\' else escape.charValue()
+        val commentChar: Char = if (comment == null) '\0' else comment
+        new LineCsvReader(fieldSep = delimiter, quote = quote, escape = escapeVal,
+          commentMarker = commentChar).parseLine(firstLine)
+      } else {
+        val csvFormat = CSVFormat.DEFAULT
+          .withDelimiter(delimiter)
+          .withQuote(quote)
+          .withEscape(escape)
+          .withSkipHeaderRecord(false)
+        CSVParser.parse(firstLine, csvFormat).getRecords.get(0).asScala.toArray
+      }
+      if(null == firstRow) {
+        throw new DataLoadingException("First line of the csv is not valid.")
+      }
+      val header = if (useHeader) {
+        firstRow
+      } else {
+        firstRow.zipWithIndex.map { case (value, index) => s"C$index"}
+      }
+      if (this.inferCsvSchema) {
+        InferSchema(tokenRdd(header), header)
+      } else {
+        // By default fields are assumed to be StringType
+        val schemaFields = header.map { fieldName =>
+          StructField(fieldName.toString, StringType, nullable = true)
+        }
+        StructType(schemaFields)
+      }
+    }
+  }
+
+  /**
+   * Returns the first line of the first non-empty file in path
+   */
+  private lazy val firstLine = {
+    val csv = CarbonTextFile.withCharset(sqlContext.sparkContext, location, charset)
+    if (comment == null) {
+      csv.first()
+    } else {
+      csv.take(MAX_COMMENT_LINES_IN_HEADER)
+        .find(x => !StringUtils.isEmpty(x) && !x.startsWith(comment.toString))
+        .getOrElse(sys.error(s"No uncommented header line in " +
+          s"first $MAX_COMMENT_LINES_IN_HEADER lines"))
+    }
+   }
+
+  private def univocityParseCSV(
+     file: RDD[String],
+     header: Seq[String]): RDD[Array[String]] = {
+    // If header is set, make sure firstLine is materialized before sending to executors.
+    val filterLine = if (useHeader) firstLine else null
+    val dataLines = if (useHeader) file.filter(_ != filterLine) else file
+    val rows = dataLines.mapPartitionsWithIndex({
+      case (split, iter) =>
+        val escapeVal = if (escape == null) '\\' else escape.charValue()
+        val commentChar: Char = if (comment == null) '\0' else comment
+
+        new CarbonBulkCsvReader(iter, split,
+          headers = header, fieldSep = delimiter,
+          quote = quote, escape = escapeVal, commentMarker = commentChar,
+          ignoreLeadingSpace = ignoreLeadingWhiteSpace,
+          ignoreTrailingSpace = ignoreTrailingWhiteSpace)
+    }, true)
+    rows
+  }
+
+  private def parseCSV(
+      iter: Iterator[String],
+      csvFormat: CSVFormat): Iterator[Array[String]] = {
+    iter.flatMap { line =>
+      try {
+        val records = CSVParser.parse(line, csvFormat).getRecords
+        if (records.isEmpty) {
+          logger.warn(s"Ignoring empty line: $line")
+          None
+        } else {
+          Some(records.get(0).asScala.toArray)
+        }
+      } catch {
+        case NonFatal(e) if !failFast =>
+          logger.error(s"Exception while parsing line: $line. ", e)
+          None
+      }
+    }
+  }
+
+  // The function below was borrowed from JSONRelation
+  override def insert(data: DataFrame, overwrite: Boolean): Unit = {
+    val filesystemPath = new Path(location)
+    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+    if (overwrite) {
+      try {
+        fs.delete(filesystemPath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(
+            s"Unable to clear output directory ${filesystemPath.toString} prior"
+              + s" to INSERT OVERWRITE a CSV table:\n${e.toString}")
+      }
+      // Write the data. We assume that schema isn't changed, and we won't update it.
+      data.saveAsCsvFile(location, Map("delimiter" -> delimiter.toString))
+    } else {
+      sys.error("CSV tables only support INSERT OVERWRITE for now.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
new file mode 100644
index 0000000..b5d7542
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.csv.newapi
+
+import java.nio.charset.Charset
+
+import com.databricks.spark.csv.util.TextFile
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{LongWritable, Text}
+import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
+import org.apache.spark.SparkContext
+import org.apache.spark.rdd.{NewHadoopRDD, RDD}
+import org.apache.spark.util.FileUtils
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * create RDD use CarbonDataLoadInputFormat
+ */
+object CarbonTextFile {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  def configSplitMaxSize(context: SparkContext, filePaths: String,
+      hadoopConfiguration: Configuration): Unit = {
+    val defaultParallelism = if (context.defaultParallelism < 1) {
+      1
+    } else {
+      context.defaultParallelism
+    }
+    val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
+    val blockSize =
+      hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
+    LOGGER.info("[Block Distribution]")
+    // calculate new block size to allow use all the parallelism
+    if (spaceConsumed < defaultParallelism * blockSize) {
+      var newSplitSize: Long = spaceConsumed / defaultParallelism
+      if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
+        newSplitSize = CarbonCommonConstants.CARBON_16MB
+      }
+      hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
+      LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
+          s"defaultParallelism: $defaultParallelism")
+      LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
+    }
+  }
+  private def newHadoopRDD(sc: SparkContext, location: String) = {
+    val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
+    hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
+    hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
+    hadoopConfiguration.set("io.compression.codecs",
+      """org.apache.hadoop.io.compress.GzipCodec,
+         org.apache.hadoop.io.compress.DefaultCodec,
+         org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
+
+    configSplitMaxSize(sc, location, hadoopConfiguration)
+    new NewHadoopRDD[LongWritable, Text](
+      sc,
+      classOf[TextInputFormat],
+      classOf[LongWritable],
+      classOf[Text],
+      hadoopConfiguration).setName("newHadoopRDD-spark-csv")
+  }
+
+  def withCharset(sc: SparkContext, location: String, charset: String): RDD[String] = {
+    if (Charset.forName(charset) == TextFile.DEFAULT_CHARSET) {
+      newHadoopRDD(sc, location).map(pair => pair._2.toString)
+    } else {
+      // can't pass a Charset object here cause its not serializable
+      // TODO: maybe use mapPartitions instead?
+      newHadoopRDD(sc, location).map(
+        pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
new file mode 100644
index 0000000..89595e5
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.databricks.spark.csv.newapi
+
+import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
+import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Provides access to CSV data from pure SQL statements (i.e. for users of the
+ * JDBC server).
+ */
+class DefaultSource
+    extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
+
+  private def checkPath(parameters: Map[String, String]): String = {
+    parameters.getOrElse("path", sys.error("'path' must be specified for CSV data."))
+  }
+
+  /**
+   * Creates a new relation for data store in CSV given parameters.
+   * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
+   */
+  override def createRelation(sqlContext: SQLContext,
+      parameters: Map[String, String]): BaseRelation = {
+    createRelation(sqlContext, parameters, null)
+  }
+
+  /**
+   * Creates a new relation for data store in CSV given parameters and user supported schema.
+   * Parameters have to include 'path' and optionally 'delimiter', 'quote', and 'header'
+   */
+  override def createRelation(
+    sqlContext: SQLContext,
+    parameters: Map[String, String],
+    schema: StructType): BaseRelation = {
+    val path = checkPath(parameters)
+    val delimiter = TypeCast.toChar(parameters.getOrElse("delimiter", ","))
+
+    val quote = parameters.getOrElse("quote", "\"")
+    val quoteChar = if (quote.length == 1) {
+      quote.charAt(0)
+    } else {
+      throw new Exception("Quotation cannot be more than one character.")
+    }
+
+    val escape = parameters.getOrElse("escape", null)
+    val escapeChar: Character = if (escape == null || (escape.length == 0)) {
+      null
+    } else if (escape.length == 1) {
+      escape.charAt(0)
+    } else {
+      throw new Exception("Escape character cannot be more than one character.")
+    }
+
+    val comment = parameters.getOrElse("comment", "#")
+    val commentChar: Character = if (comment == null) {
+      null
+    } else if (comment.length == 1) {
+      comment.charAt(0)
+    } else {
+      throw new Exception("Comment marker cannot be more than one character.")
+    }
+
+    val parseMode = parameters.getOrElse("mode", "PERMISSIVE")
+
+    val useHeader = parameters.getOrElse("header", "false")
+    val headerFlag = if (useHeader == "true") {
+      true
+    } else if (useHeader == "false") {
+      false
+    } else {
+      throw new Exception("Header flag can be true or false")
+    }
+
+    val parserLib = parameters.getOrElse("parserLib", ParserLibs.DEFAULT)
+    val ignoreLeadingWhiteSpace = parameters.getOrElse("ignoreLeadingWhiteSpace", "false")
+    val ignoreLeadingWhiteSpaceFlag = if (ignoreLeadingWhiteSpace == "false") {
+      false
+    } else if (ignoreLeadingWhiteSpace == "true") {
+      if (!ParserLibs.isUnivocityLib(parserLib)) {
+        throw new Exception("Ignore whitesspace supported for Univocity parser only")
+      }
+      true
+    } else {
+      throw new Exception("Ignore white space flag can be true or false")
+    }
+    val ignoreTrailingWhiteSpace = parameters.getOrElse("ignoreTrailingWhiteSpace", "false")
+    val ignoreTrailingWhiteSpaceFlag = if (ignoreTrailingWhiteSpace == "false") {
+      false
+    } else if (ignoreTrailingWhiteSpace == "true") {
+      if (!ParserLibs.isUnivocityLib(parserLib)) {
+        throw new Exception("Ignore whitespace supported for the Univocity parser only")
+      }
+      true
+    } else {
+      throw new Exception("Ignore white space flag can be true or false")
+    }
+
+    val charset = parameters.getOrElse("charset", TextFile.DEFAULT_CHARSET.name())
+    // TODO validate charset?
+
+    val inferSchema = parameters.getOrElse("inferSchema", "false")
+    val inferSchemaFlag = if (inferSchema == "false") {
+      false
+    } else if (inferSchema == "true") {
+      true
+    } else {
+      throw new Exception("Infer schema flag can be true or false")
+    }
+
+    CarbonCsvRelation(path,
+      headerFlag,
+      delimiter,
+      quoteChar,
+      escapeChar,
+      commentChar,
+      parseMode,
+      parserLib,
+      ignoreLeadingWhiteSpaceFlag,
+      ignoreTrailingWhiteSpaceFlag,
+      schema,
+      charset,
+      inferSchemaFlag)(sqlContext)
+  }
+
+  override def createRelation(
+    sqlContext: SQLContext,
+    mode: SaveMode,
+    parameters: Map[String, String],
+    data: DataFrame): BaseRelation = {
+    val path = checkPath(parameters)
+    val filesystemPath = new Path(path)
+    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+    val doSave = if (fs.exists(filesystemPath)) {
+      mode match {
+        case SaveMode.Append =>
+          sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
+        case SaveMode.Overwrite =>
+          fs.delete(filesystemPath, true)
+          true
+        case SaveMode.ErrorIfExists =>
+          sys.error(s"path $path already exists.")
+        case SaveMode.Ignore => false
+      }
+    } else {
+      true
+    }
+
+    val codec: Class[_ <: CompressionCodec] =
+      parameters.getOrElse("codec", "none") match {
+        case "gzip" => classOf[GzipCodec]
+        case _ => null
+      }
+
+    if (doSave) {
+      // Only save data when the save mode is not ignore.
+      data.saveAsCsvFile(path, parameters, codec)
+    }
+
+    createRelation(sqlContext, parameters, data.schema)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
new file mode 100644
index 0000000..9cc46c1
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.execution.command.Partitioner
+
+import org.apache.carbondata.spark.Value
+import org.apache.carbondata.spark.util.CarbonQueryUtil
+
+class CarbonCleanFilesRDD[V: ClassTag](
+    sc: SparkContext,
+    valueClass: Value[V],
+    databaseName: String,
+    tableName: String,
+    partitioner: Partitioner)
+  extends RDD[V](sc, Nil) {
+
+  sc.setLocalProperty("spark.scheduler.pool", "DDL")
+
+  override def getPartitions: Array[Partition] = {
+    val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null)
+    splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1))
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = {
+    val iter = new Iterator[(V)] {
+      val split = theSplit.asInstanceOf[CarbonLoadPartition]
+      logInfo("Input split: " + split.serializableHadoopSplit.value)
+      // TODO call CARBON delete API
+
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): V = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        valueClass.getValue(null)
+      }
+
+    }
+    iter
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val theSplit = split.asInstanceOf[CarbonLoadPartition]
+    val s = theSplit.serializableHadoopSplit.value.getLocations.asScala
+    logInfo("Host Name: " + s.head + s.length)
+    s
+  }
+}
+