You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/08/24 13:01:35 UTC

[3/4] carbondata git commit: [CARBONDATA-2872] Added Spark FileFormat interface implementation in Carbon

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/pom.xml b/integration/spark-datasource/pom.xml
new file mode 100644
index 0000000..38cf629
--- /dev/null
+++ b/integration/spark-datasource/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-spark-datasource</artifactId>
+  <name>Apache CarbonData :: Spark Datasource</name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+    <jacoco.append>true</jacoco.append>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-hadoop</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-repl_${scala.binary.version}</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <resources>
+      <resource>
+        <directory>src/resources</directory>
+      </resource>
+      <resource>
+        <directory>.</directory>
+        <includes>
+          <include>CARBON_SPARK_INTERFACELogResource.properties</include>
+        </includes>
+      </resource>
+    </resources>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.18</version>
+        <!-- Note config is repeated in scalatest config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+          </systemProperties>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine> ${argLine} -ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+            <spark.carbon.hive.schema.store>${carbon.hive.based.metastore}</spark.carbon.hive.schema.store>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+    <profile>
+      <id>build-all</id>
+      <properties>
+        <spark.version>2.2.1</spark.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <scala.version>2.11.8</scala.version>
+      </properties>
+    </profile>
+    <profile>
+      <id>sdvtest</id>
+      <properties>
+        <maven.test.skip>true</maven.test.skip>
+      </properties>
+    </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
new file mode 100644
index 0000000..7e38691
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/converter/SparkDataTypeConverterImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.converter;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.util.DataTypeConverter;
+
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
+import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * Convert java data type to spark data type
+ */
+public final class SparkDataTypeConverterImpl implements DataTypeConverter, Serializable {
+
+  private static final long serialVersionUID = -4379212832935070583L;
+
+  @Override
+  public Object convertFromStringToDecimal(Object data) {
+    BigDecimal javaDecVal = new BigDecimal(data.toString());
+    return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
+  }
+
+  @Override
+  public Object convertFromBigDecimalToDecimal(Object data) {
+    if (null == data) {
+      return null;
+    }
+    return org.apache.spark.sql.types.Decimal.apply((BigDecimal)data);
+  }
+
+  @Override
+  public Object convertFromDecimalToBigDecimal(Object data) {
+    return ((org.apache.spark.sql.types.Decimal) data).toJavaBigDecimal();
+  }
+
+  @Override
+  public byte[] convertFromStringToByte(Object data) {
+    if (null == data) {
+      return null;
+    }
+    return UTF8String.fromString((String) data).getBytes();
+  }
+
+  @Override
+  public Object convertFromByteToUTF8String(byte[] data) {
+    if (null == data) {
+      return null;
+    }
+    return UTF8String.fromBytes(data);
+  }
+
+  @Override
+  public byte[] convertFromByteToUTF8Bytes(byte[] data) {
+    return UTF8String.fromBytes(data).getBytes();
+  }
+
+  @Override
+  public Object convertFromStringToUTF8String(Object data) {
+    if (null == data) {
+      return null;
+    }
+    return UTF8String.fromString((String) data);
+  }
+
+  @Override
+  public Object wrapWithGenericArrayData(Object data) {
+    return new GenericArrayData(data);
+  }
+
+  @Override
+  public Object wrapWithGenericRow(Object[] fields) {
+    return new GenericInternalRow(fields);
+  }
+
+  private static org.apache.spark.sql.types.DataType convertCarbonToSparkDataType(
+      DataType carbonDataType) {
+    if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.STRING) {
+      return DataTypes.StringType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT) {
+      return DataTypes.ShortType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT) {
+      return DataTypes.IntegerType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+      return DataTypes.LongType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE) {
+      return DataTypes.DoubleType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN) {
+      return DataTypes.BooleanType;
+    } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(carbonDataType)) {
+      return DataTypes.createDecimalType();
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.TIMESTAMP) {
+      return DataTypes.TimestampType;
+    } else if (carbonDataType == org.apache.carbondata.core.metadata.datatype.DataTypes.DATE) {
+      return DataTypes.DateType;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * convert from CarbonColumn array to Spark's StructField array
+   */
+  @Override
+  public Object[] convertCarbonSchemaToSparkSchema(CarbonColumn[] carbonColumns) {
+    StructField[] fields = new StructField[carbonColumns.length];
+    for (int i = 0; i < carbonColumns.length; i++) {
+      CarbonColumn carbonColumn = carbonColumns[i];
+      if (carbonColumn.isDimension()) {
+        if (carbonColumn.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+          DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+              .getDirectDictionaryGenerator(carbonColumn.getDataType());
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+        } else if (!carbonColumn.hasEncoding(Encoding.DICTIONARY)) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+        } else if (carbonColumn.isComplex()) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(carbonColumn.getDataType()), true, null);
+        } else {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(
+                  org.apache.carbondata.core.metadata.datatype.DataTypes.INT), true, null);
+        }
+      } else if (carbonColumn.isMeasure()) {
+        DataType dataType = carbonColumn.getDataType();
+        if (dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.BOOLEAN
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.SHORT
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.INT
+            || dataType == org.apache.carbondata.core.metadata.datatype.DataTypes.LONG) {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(dataType), true, null);
+        } else if (org.apache.carbondata.core.metadata.datatype.DataTypes.isDecimal(dataType)) {
+          CarbonMeasure measure = (CarbonMeasure) carbonColumn;
+          fields[i] = new StructField(carbonColumn.getColName(),
+              new DecimalType(measure.getPrecision(), measure.getScale()), true, null);
+        } else {
+          fields[i] = new StructField(carbonColumn.getColName(),
+              convertCarbonToSparkDataType(
+                  org.apache.carbondata.core.metadata.datatype.DataTypes.DOUBLE), true, null);
+        }
+      }
+    }
+    return fields;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
new file mode 100644
index 0000000..7f1e577
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/CarbonDictionaryWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.vectorreader;
+
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.io.api.Binary;
+
+public class CarbonDictionaryWrapper extends Dictionary {
+
+  private Binary[] binaries;
+
+  CarbonDictionaryWrapper(Encoding encoding, CarbonDictionary dictionary) {
+    super(encoding);
+    binaries = new Binary[dictionary.getDictionarySize()];
+    for (int i = 0; i < binaries.length; i++) {
+      binaries[i] = Binary.fromReusedByteArray(dictionary.getDictionaryValue(i));
+    }
+  }
+
+  @Override public int getMaxId() {
+    return binaries.length - 1;
+  }
+
+  @Override public Binary decodeToBinary(int id) {
+    return binaries[id];
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
new file mode 100644
index 0000000..4f34650
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/ColumnarVectorWrapper.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonDictionary;
+
+import org.apache.parquet.column.Encoding;
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.types.Decimal;
+
+class ColumnarVectorWrapper implements CarbonColumnVector {
+
+  private ColumnVector columnVector;
+
+  private boolean[] filteredRows;
+
+  private int counter;
+
+  private boolean filteredRowsExist;
+
+  private DataType blockDataType;
+
+  private CarbonColumnVector dictionaryVector;
+
+  ColumnarVectorWrapper(ColumnVector columnVector, boolean[] filteredRows) {
+    this.columnVector = columnVector;
+    this.filteredRows = filteredRows;
+    if (columnVector.getDictionaryIds() != null) {
+      this.dictionaryVector =
+          new ColumnarVectorWrapper(columnVector.getDictionaryIds(), filteredRows);
+    }
+  }
+
+  @Override public void putBoolean(int rowId, boolean value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putBoolean(counter++, value);
+    }
+  }
+
+  @Override public void putFloat(int rowId, float value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putFloat(counter++, value);
+    }
+  }
+
+  @Override public void putShort(int rowId, short value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putShort(counter++, value);
+    }
+  }
+
+  @Override public void putShorts(int rowId, int count, short value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putShort(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putShorts(rowId, count, value);
+    }
+  }
+
+  @Override public void putInt(int rowId, int value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putInt(counter++, value);
+    }
+  }
+
+  @Override public void putInts(int rowId, int count, int value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putInt(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putInts(rowId, count, value);
+    }
+  }
+
+  @Override public void putLong(int rowId, long value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putLong(counter++, value);
+    }
+  }
+
+  @Override public void putLongs(int rowId, int count, long value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putLong(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putLongs(rowId, count, value);
+    }
+  }
+
+  @Override public void putDecimal(int rowId, BigDecimal value, int precision) {
+    if (!filteredRows[rowId]) {
+      Decimal toDecimal = Decimal.apply(value);
+      columnVector.putDecimal(counter++, toDecimal, precision);
+    }
+  }
+
+  @Override public void putDecimals(int rowId, int count, BigDecimal value, int precision) {
+    Decimal decimal = Decimal.apply(value);
+    for (int i = 0; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putDecimal(counter++, decimal, precision);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putDouble(int rowId, double value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putDouble(counter++, value);
+    }
+  }
+
+  @Override public void putDoubles(int rowId, int count, double value) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putDouble(counter++, value);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putDoubles(rowId, count, value);
+    }
+  }
+
+  @Override public void putBytes(int rowId, byte[] value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putByteArray(counter++, value);
+    }
+  }
+
+  @Override public void putBytes(int rowId, int count, byte[] value) {
+    for (int i = 0; i < count; i++) {
+      if (!filteredRows[rowId]) {
+        columnVector.putByteArray(counter++, value);
+      }
+      rowId++;
+    }
+  }
+
+  @Override public void putBytes(int rowId, int offset, int length, byte[] value) {
+    if (!filteredRows[rowId]) {
+      columnVector.putByteArray(counter++, value, offset, length);
+    }
+  }
+
+  @Override public void putNull(int rowId) {
+    if (!filteredRows[rowId]) {
+      columnVector.putNull(counter++);
+    }
+  }
+
+  @Override public void putNulls(int rowId, int count) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putNull(counter++);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putNulls(rowId, count);
+    }
+  }
+
+  @Override public void putNotNull(int rowId) {
+    if (!filteredRows[rowId]) {
+      columnVector.putNotNull(counter++);
+    }
+  }
+
+  @Override public void putNotNull(int rowId, int count) {
+    if (filteredRowsExist) {
+      for (int i = 0; i < count; i++) {
+        if (!filteredRows[rowId]) {
+          columnVector.putNotNull(counter++);
+        }
+        rowId++;
+      }
+    } else {
+      columnVector.putNotNulls(rowId, count);
+    }
+  }
+
+  @Override public boolean isNull(int rowId) {
+    return columnVector.isNullAt(rowId);
+  }
+
+  @Override public void putObject(int rowId, Object obj) {
+    //TODO handle complex types
+  }
+
+  @Override public Object getData(int rowId) {
+    //TODO handle complex types
+    return null;
+  }
+
+  @Override public void reset() {
+    counter = 0;
+    filteredRowsExist = false;
+    if (null != dictionaryVector) {
+      dictionaryVector.reset();
+    }
+  }
+
+  @Override public DataType getType() {
+    return CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(columnVector.dataType());
+  }
+
+  @Override
+  public DataType getBlockDataType() {
+    return blockDataType;
+  }
+
+  @Override
+  public void setBlockDataType(DataType blockDataType) {
+    this.blockDataType = blockDataType;
+  }
+
+  @Override public void setFilteredRowsExist(boolean filteredRowsExist) {
+    this.filteredRowsExist = filteredRowsExist;
+  }
+
+  @Override public void setDictionary(CarbonDictionary dictionary) {
+    if (dictionary == null) {
+      columnVector.setDictionary(null);
+    } else {
+      columnVector.setDictionary(new CarbonDictionaryWrapper(Encoding.PLAIN, dictionary));
+    }
+  }
+
+  @Override public boolean hasDictionary() {
+    return columnVector.hasDictionary();
+  }
+
+  @Override public CarbonColumnVector getDictionaryVector() {
+    return dictionaryVector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
new file mode 100644
index 0000000..2c53d20
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/carbondata/spark/vectorreader/VectorizedCarbonRecordReader.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.vectorreader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
+import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.ProjectionDimension;
+import org.apache.carbondata.core.scan.model.ProjectionMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.AbstractDetailQueryResultIterator;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.scan.result.vector.CarbonColumnarBatch;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.hadoop.AbstractRecordReader;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.CarbonMultiBlockSplit;
+import org.apache.carbondata.hadoop.InputMetricsStats;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.types.DecimalType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
+ * carbondata column APIs and fills the data directly into columns.
+ */
+public class VectorizedCarbonRecordReader extends AbstractRecordReader<Object> {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(VectorizedCarbonRecordReader.class.getName());
+
+  private int batchIdx = 0;
+
+  private int numBatched = 0;
+
+  private ColumnarBatch columnarBatch;
+
+  private CarbonColumnarBatch carbonColumnarBatch;
+
+  /**
+   * If true, this class returns batches instead of rows.
+   */
+  private boolean returnColumnarBatch;
+
+  private boolean[] isNoDictStringField;
+
+  /**
+   * The default config on whether columnarBatch should be onheap.
+   */
+  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+  private QueryModel queryModel;
+
+  private AbstractDetailQueryResultIterator iterator;
+
+  private QueryExecutor queryExecutor;
+
+  private InputMetricsStats inputMetricsStats;
+
+  public VectorizedCarbonRecordReader(QueryModel queryModel, InputMetricsStats inputMetricsStats,
+      String enableBatch) {
+    this.queryModel = queryModel;
+    this.inputMetricsStats = inputMetricsStats;
+    if (enableBatch.equals("true")) {
+      enableReturningBatches();
+    }
+  }
+
+
+  /*
+ * Can be called before any rows are returned to enable returning columnar batches directly.
+ */
+  public void enableReturningBatches() {
+    returnColumnarBatch = true;
+  }
+
+  /**
+   * Implementation of RecordReader API.
+   */
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException, UnsupportedOperationException {
+    // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+    // blocks and then set them in the query model.
+    List<CarbonInputSplit> splitList;
+    if (inputSplit instanceof CarbonInputSplit) {
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonInputSplit) inputSplit);
+    } else if (inputSplit instanceof CarbonMultiBlockSplit) {
+      // contains multiple blocks, this is an optimization for concurrent query.
+      CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit;
+      splitList = multiBlockSplit.getAllSplits();
+    } else {
+      throw new RuntimeException("unsupported input split type: " + inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+    queryModel.setVectorReader(true);
+    try {
+      queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+      iterator = (AbstractDetailQueryResultIterator) queryExecutor.execute(queryModel);
+    } catch (QueryExecutionException e) {
+      if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+        LOGGER.error(e);
+        throw new InterruptedException(
+            "Insert overwrite may be in progress.Please check " + e.getMessage());
+      }
+      throw new InterruptedException(e.getMessage());
+    } catch (Exception e) {
+      if (ExceptionUtils.indexOfThrowable(e, FileNotFoundException.class) > 0) {
+        LOGGER.error(e);
+        throw new InterruptedException(
+            "Insert overwrite may be in progress.Please check " + e.getMessage());
+      }
+      throw e;
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    logStatistics(rowCount, queryModel.getStatisticsRecorder());
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    // clear dictionary cache
+    Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+    if (null != columnToDictionaryMapping) {
+      for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+        CarbonUtil.clearDictionaryCache(entry.getValue());
+      }
+    }
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    resultBatch();
+
+    if (returnColumnarBatch) {
+      return nextBatch();
+    }
+
+    if (batchIdx >= numBatched) {
+      if (!nextBatch()) return false;
+    }
+    ++batchIdx;
+    return true;
+  }
+
+  @Override
+  public Object getCurrentValue() throws IOException, InterruptedException {
+    if (returnColumnarBatch) {
+      int value = columnarBatch.numValidRows();
+      rowCount += value;
+      if (inputMetricsStats != null) {
+        inputMetricsStats.incrementRecordRead((long) value);
+      }
+      return columnarBatch;
+    }
+    rowCount += 1;
+    return columnarBatch.getRow(batchIdx - 1);
+  }
+
+  @Override
+  public Void getCurrentKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    // TODO : Implement it based on total number of rows it is going to retrive.
+    return 0;
+  }
+
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+
+  public void initBatch(MemoryMode memMode, StructType partitionColumns,
+      InternalRow partitionValues) {
+    List<ProjectionDimension> queryDimension = queryModel.getProjectionDimensions();
+    List<ProjectionMeasure> queryMeasures = queryModel.getProjectionMeasures();
+    StructField[] fields = new StructField[queryDimension.size() + queryMeasures.size()];
+    this.isNoDictStringField = new boolean[queryDimension.size() + queryMeasures.size()];
+    for (int i = 0; i < queryDimension.size(); i++) {
+      ProjectionDimension dim = queryDimension.get(i);
+      if (dim.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        DirectDictionaryGenerator generator = DirectDictionaryKeyGeneratorFactory
+            .getDirectDictionaryGenerator(dim.getDimension().getDataType());
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(generator.getReturnType()), true, null);
+      } else if (!dim.getDimension().hasEncoding(Encoding.DICTIONARY)) {
+        if (dim.getDimension().getDataType() == DataTypes.STRING
+            || dim.getDimension().getDataType() == DataTypes.VARCHAR) {
+          this.isNoDictStringField[dim.getOrdinal()] = true;
+        }
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
+            null);
+      } else if (dim.getDimension().isComplex()) {
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(dim.getDimension().getDataType()), true,
+            null);
+      } else {
+        fields[dim.getOrdinal()] = new StructField(dim.getColumnName(),
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(DataTypes.INT), true, null);
+      }
+    }
+
+    for (int i = 0; i < queryMeasures.size(); i++) {
+      ProjectionMeasure msr = queryMeasures.get(i);
+      DataType dataType = msr.getMeasure().getDataType();
+      if (dataType == DataTypes.BOOLEAN || dataType == DataTypes.SHORT ||
+          dataType == DataTypes.INT || dataType == DataTypes.LONG) {
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(msr.getMeasure().getDataType()), true,
+            null);
+      } else if (DataTypes.isDecimal(dataType)) {
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+            new DecimalType(msr.getMeasure().getPrecision(), msr.getMeasure().getScale()), true,
+            null);
+      } else {
+        fields[msr.getOrdinal()] = new StructField(msr.getColumnName(),
+            CarbonSparkDataSourceUtil.convertCarbonToSparkDataType(DataTypes.DOUBLE), true, null);
+      }
+    }
+
+    StructType schema = new StructType(fields);
+    if (partitionColumns != null) {
+      for (StructField field : partitionColumns.fields()) {
+        schema = schema.add(field);
+      }
+    }
+    columnarBatch = ColumnarBatch.allocate(schema, memMode);
+    if (partitionColumns != null) {
+      int partitionIdx = fields.length;
+      for (int i = 0; i < partitionColumns.fields().length; i++) {
+        ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
+        columnarBatch.column(i + partitionIdx).setIsConstant();
+      }
+    }
+    CarbonColumnVector[] vectors = new CarbonColumnVector[fields.length];
+    boolean[] filteredRows = new boolean[columnarBatch.capacity()];
+    for (int i = 0; i < fields.length; i++) {
+      if (isNoDictStringField[i]) {
+        columnarBatch.column(i).reserveDictionaryIds(columnarBatch.capacity());
+      }
+      vectors[i] = new ColumnarVectorWrapper(columnarBatch.column(i), filteredRows);
+    }
+    carbonColumnarBatch = new CarbonColumnarBatch(vectors, columnarBatch.capacity(), filteredRows);
+  }
+
+  private void initBatch() {
+    initBatch(DEFAULT_MEMORY_MODE, new StructType(), InternalRow.empty());
+  }
+
+  private void resultBatch() {
+    if (columnarBatch == null) initBatch();
+  }
+
+
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  private boolean nextBatch() {
+    if (null != isNoDictStringField) {
+      for (int i = 0; i < isNoDictStringField.length; i++) {
+        if (isNoDictStringField[i]) {
+          columnarBatch.column(i).getDictionaryIds().reset();
+        }
+      }
+    }
+    columnarBatch.reset();
+    carbonColumnarBatch.reset();
+    if (iterator.hasNext()) {
+      iterator.processNextBatch(carbonColumnarBatch);
+      int actualSize = carbonColumnarBatch.getActualSize();
+      columnarBatch.setNumRows(actualSize);
+      numBatched = actualSize;
+      batchIdx = 0;
+      return true;
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
new file mode 100644
index 0000000..8471181
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndex.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import java.io.IOException
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.execution.datasources.{InMemoryFileIndex, _}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, HDFSCarbonFile}
+import org.apache.carbondata.core.readcommitter.LatestFilesReadCommittedScope
+import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat}
+
+/**
+ * Its a custom implementation which uses carbon's driver pruning feature to prune carbondata files
+ * using carbonindex.
+ */
+class CarbonFileIndex(
+    sparkSession: SparkSession,
+    dataSchema: StructType,
+    parameters: Map[String, String],
+    fileIndex: FileIndex)
+  extends FileIndex with AbstractCarbonFileIndex {
+
+  override def rootPaths: Seq[Path] = fileIndex.rootPaths
+
+  override def inputFiles: Array[String] = fileIndex.inputFiles
+
+  override def refresh(): Unit = fileIndex.refresh()
+
+  override def sizeInBytes: Long = fileIndex.sizeInBytes
+
+  override def partitionSchema: StructType = fileIndex.partitionSchema
+
+  /**
+   * It lists the pruned files after applying partition and data filters.
+   *
+   * @param partitionFilters
+   * @param dataFilters
+   * @return
+   */
+  override def listFiles(partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
+    val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
+    val directories =
+      method.invoke(
+        fileIndex,
+        partitionFilters,
+        dataFilters).asInstanceOf[Seq[PartitionDirectory]]
+    prune(dataFilters, directories)
+  }
+
+  private def prune(dataFilters: Seq[Expression],
+      directories: Seq[PartitionDirectory]) = {
+    val tablePath = parameters.get("path")
+    if (tablePath.nonEmpty) {
+      val hadoopConf = sparkSession.sessionState.newHadoopConf()
+      // convert t sparks source filter
+      val filters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
+
+      // convert to carbon filter expressions
+      val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+        CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+      }.reduceOption(new AndExpression(_, _))
+      val model = CarbonSparkDataSourceUtil.prepareLoadModel(parameters, dataSchema)
+      CarbonInputFormat.setTableInfo(
+        hadoopConf,
+        model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+      CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+      var totalFiles = 0
+      val indexFiles = directories.flatMap { dir =>
+        totalFiles += dir.files.length
+        dir.files.filter{f =>
+          f.getPath.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+          f.getPath.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)}.
+          map(new HDFSCarbonFile(_))
+      }.toArray.asInstanceOf[Array[CarbonFile]]
+      if (indexFiles.length == 0 && totalFiles > 0) {
+        throw new IOException("No Index files are present in the table location :" + tablePath.get)
+      }
+      CarbonInputFormat.setReadCommittedScope(
+        hadoopConf,
+        new LatestFilesReadCommittedScope(indexFiles))
+      filter match {
+        case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+        case None => None
+      }
+      val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+      val jobConf = new JobConf(hadoopConf)
+      SparkHadoopUtil.get.addCredentials(jobConf)
+      val splits = format.getSplits(Job.getInstance(jobConf))
+        .asInstanceOf[util.List[CarbonInputSplit]].asScala
+      val prunedDirs = directories.map { dir =>
+        val files = dir.files
+          .filter(d => splits.exists(_.getBlockPath.equalsIgnoreCase(d.getPath.getName)))
+        PartitionDirectory(dir.values, files)
+      }
+      prunedDirs
+    } else {
+      directories
+    }
+  }
+
+  override def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory] = {
+    val method = fileIndex.getClass.getMethods.find(_.getName == "listFiles").get
+    val directories =
+      method.invoke(fileIndex, filters).asInstanceOf[Seq[PartitionDirectory]]
+    prune(filters, directories)
+  }
+}
+
+/**
+ * It is a just class to make compile between spark 2.1 and 2.2
+ */
+trait AbstractCarbonFileIndex {
+
+  def listFiles(partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression]): Seq[PartitionDirectory]
+
+  def listFiles(filters: Seq[Expression]): Seq[PartitionDirectory]
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
new file mode 100644
index 0000000..ed67f48
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonFileIndexReplaceRule.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, InMemoryFileIndex, InsertIntoHadoopFsRelationCommand, LogicalRelation}
+import org.apache.spark.sql.sources.BaseRelation
+
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+/**
+ * Rule to replace FileIndex with CarbonFileIndex for better driver pruning.
+ */
+class CarbonFileIndexReplaceRule extends Rule[LogicalPlan] {
+
+  /**
+   * This property creates subfolder for every load
+   */
+  private val createSubFolder = CarbonProperties.getInstance()
+    .getProperty("carbonfileformat.create.folder.perload", "false").toBoolean
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    val transformedPlan = plan.transform {
+      case l: LogicalRelation
+        if l.relation.isInstanceOf[HadoopFsRelation] &&
+           l.relation.asInstanceOf[HadoopFsRelation].fileFormat.toString.equals("carbon") &&
+           !l.relation.asInstanceOf[HadoopFsRelation].location.isInstanceOf[CarbonFileIndex] =>
+        val fsRelation = l.relation.asInstanceOf[HadoopFsRelation]
+        val fileIndex = fsRelation.location
+        val carbonFileIndex = new CarbonFileIndex(fsRelation.sparkSession,
+          fsRelation.schema,
+          fsRelation.options,
+          updateFileIndex(fileIndex, fsRelation))
+        val fsRelationCopy = fsRelation.copy(location = carbonFileIndex)(fsRelation.sparkSession)
+        val logicalRelation = l.copy(relation = fsRelationCopy.asInstanceOf[BaseRelation])
+        logicalRelation
+      case insert: InsertIntoHadoopFsRelationCommand
+        if createSubFolder && insert.fileFormat.toString.equals("carbon") &&
+           FileFactory.getUpdatedFilePath(insert.outputPath.toString).equals(
+             FileFactory.getUpdatedFilePath(insert.options("path"))) &&
+           insert.partitionColumns.isEmpty =>
+        val path = new Path(insert.outputPath, System.nanoTime().toString)
+        insert.copy(outputPath = path)
+    }
+    transformedPlan
+  }
+
+  private def updateFileIndex(fileIndex: FileIndex,
+      hadoopFsRelation: HadoopFsRelation): FileIndex = {
+    if (fileIndex.isInstanceOf[InMemoryFileIndex] && fileIndex.rootPaths.length == 1) {
+      val carbonFile = FileFactory.getCarbonFile(fileIndex.rootPaths.head.toUri.toString)
+      val carbonFiles = carbonFile.listFiles()
+      if (carbonFiles.nonEmpty &&
+          !carbonFiles.exists(_.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT))) {
+        val paths = carbonFiles.map(p => new Path(p.getAbsolutePath)).toSeq
+        new InMemoryFileIndex(hadoopFsRelation.sparkSession,
+          paths,
+          hadoopFsRelation.options,
+          Some(hadoopFsRelation.partitionSchema))
+      } else {
+        fileIndex
+      }
+    } else {
+      fileIndex
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
new file mode 100644
index 0000000..8724fd1
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/CarbonSparkDataSourceUtil.scala
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.datatype
+import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, DecimalType => CarbonDecimalType, StructField => CarbonStructField}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.core.scan.expression.conditional._
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.sdk.file.{CarbonWriterBuilder, Field, Schema}
+
+object CarbonSparkDataSourceUtil {
+
+  /**
+   * Convert from carbon datatype to sparks datatype
+   */
+  def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
+    if (CarbonDataTypes.isDecimal(dataType)) {
+      DecimalType(dataType.asInstanceOf[CarbonDecimalType].getPrecision,
+        dataType.asInstanceOf[CarbonDecimalType].getScale)
+    } else {
+      dataType match {
+        case CarbonDataTypes.STRING => StringType
+        case CarbonDataTypes.SHORT => ShortType
+        case CarbonDataTypes.INT => IntegerType
+        case CarbonDataTypes.LONG => LongType
+        case CarbonDataTypes.DOUBLE => DoubleType
+        case CarbonDataTypes.BOOLEAN => BooleanType
+        case CarbonDataTypes.TIMESTAMP => TimestampType
+        case CarbonDataTypes.DATE => DateType
+        case CarbonDataTypes.VARCHAR => StringType
+      }
+    }
+  }
+
+  /**
+   * Convert from sparks datatype to carbon datatype
+   */
+  def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
+    dataType match {
+      case StringType => CarbonDataTypes.STRING
+      case ShortType => CarbonDataTypes.SHORT
+      case IntegerType => CarbonDataTypes.INT
+      case LongType => CarbonDataTypes.LONG
+      case DoubleType => CarbonDataTypes.DOUBLE
+      case FloatType => CarbonDataTypes.FLOAT
+      case DateType => CarbonDataTypes.DATE
+      case BooleanType => CarbonDataTypes.BOOLEAN
+      case TimestampType => CarbonDataTypes.TIMESTAMP
+      case ArrayType(elementType, _) =>
+        CarbonDataTypes.createArrayType(convertSparkToCarbonDataType(elementType))
+      case StructType(fields) =>
+        val carbonFields = new java.util.ArrayList[CarbonStructField]
+        fields.map { field =>
+          carbonFields.add(
+            new CarbonStructField(
+              field.name,
+              convertSparkToCarbonDataType(field.dataType)))
+        }
+        CarbonDataTypes.createStructType(carbonFields)
+      case NullType => CarbonDataTypes.NULL
+      case decimal: DecimalType =>
+        CarbonDataTypes.createDecimalType(decimal.precision, decimal.scale)
+      case _ => throw new UnsupportedOperationException("getting " + dataType + " from spark")
+    }
+  }
+
+  /**
+   * Converts data sources filters to carbon filter predicates.
+   */
+  def createCarbonFilter(schema: StructType,
+      predicate: sources.Filter): Option[CarbonExpression] = {
+    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+      predicate match {
+
+        case sources.EqualTo(name, value) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.Not(sources.EqualTo(name, value)) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.EqualNullSafe(name, value) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.Not(sources.EqualNullSafe(name, value)) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.GreaterThan(name, value) =>
+          Some(new GreaterThanExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.LessThan(name, value) =>
+          Some(new LessThanExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.GreaterThanOrEqual(name, value) =>
+          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.LessThanOrEqual(name, value) =>
+          Some(new LessThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.In(name, values) =>
+          if (values.length == 1 && values(0) == null) {
+            Some(new FalseExpression(getCarbonExpression(name)))
+          } else {
+            Some(new InExpression(getCarbonExpression(name),
+              new ListExpression(
+                convertToJavaList(values.filterNot(_ == null)
+                  .map(filterValues => getCarbonLiteralExpression(name, filterValues)).toList))))
+          }
+        case sources.Not(sources.In(name, values)) =>
+          if (values.contains(null)) {
+            Some(new FalseExpression(getCarbonExpression(name)))
+          } else {
+            Some(new NotInExpression(getCarbonExpression(name),
+              new ListExpression(
+                convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+          }
+        case sources.IsNull(name) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, null), true))
+        case sources.IsNotNull(name) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, null), true))
+        case sources.And(lhs, rhs) =>
+          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+        case sources.Or(lhs, rhs) =>
+          for {
+            lhsFilter <- createFilter(lhs)
+            rhsFilter <- createFilter(rhs)
+          } yield {
+            new OrExpression(lhsFilter, rhsFilter)
+          }
+        case sources.StringStartsWith(name, value) if value.length > 0 =>
+          Some(new StartsWithExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case _ => None
+      }
+    }
+
+    def getCarbonExpression(name: String) = {
+      new CarbonColumnExpression(name,
+        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+    }
+
+    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+      val dataTypeOfAttribute =
+        CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(dataTypeOf(name))
+      val dataType =
+        if (Option(value).isDefined &&
+            dataTypeOfAttribute == CarbonDataTypes.STRING &&
+            value.isInstanceOf[Double]) {
+        CarbonDataTypes.DOUBLE
+      } else {
+        dataTypeOfAttribute
+      }
+      new CarbonLiteralExpression(value, dataType)
+    }
+
+    createFilter(predicate)
+  }
+
+  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+  // not able find the classes inside scala list and gives ClassNotFoundException.
+  def convertToJavaList(
+      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+    val javaList = new java.util.ArrayList[CarbonExpression]()
+    scalaList.foreach(javaList.add)
+    javaList
+  }
+
+  /**
+   * Create load model for carbon
+   */
+  def prepareLoadModel(options: Map[String, String],
+      dataSchema: StructType): CarbonLoadModel = {
+    val schema = new Schema(dataSchema.fields.map { field =>
+      field.dataType match {
+        case s: StructType =>
+          new Field(field.name,
+            field.dataType.typeName,
+            s.fields
+              .map(f => new datatype.StructField(f.name,
+                CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(f.dataType))).toList.asJava)
+        case a: ArrayType =>
+          new Field(field.name,
+            field.dataType.typeName,
+            Seq(new datatype.StructField(field.name,
+              CarbonSparkDataSourceUtil.convertSparkToCarbonDataType(a.elementType))).toList.asJava)
+        case other =>
+          new Field(field.name, field.dataType.simpleString)
+      }
+    })
+    val builder = new CarbonWriterBuilder
+    builder.isTransactionalTable(false)
+    builder.outputPath(options.getOrElse("path", ""))
+    val blockSize = options.get(CarbonCommonConstants.TABLE_BLOCKSIZE).map(_.toInt)
+    if (blockSize.isDefined) {
+      builder.withBlockSize(blockSize.get)
+    }
+    val blockletSize = options.get("table_blockletsize").map(_.toInt)
+    if (blockletSize.isDefined) {
+      builder.withBlockletSize(blockletSize.get)
+    }
+    builder.enableLocalDictionary(options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE,
+      CarbonCommonConstants.LOCAL_DICTIONARY_ENABLE_DEFAULT).toBoolean)
+    builder.localDictionaryThreshold(
+      options.getOrElse(CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD,
+        CarbonCommonConstants.LOCAL_DICTIONARY_THRESHOLD_DEFAULT).toInt)
+    builder.sortBy(
+      options.get(CarbonCommonConstants.SORT_COLUMNS).map(_.split(",").map(_.trim)).orNull)
+    builder.uniqueIdentifier(System.currentTimeMillis())
+    val model = builder.buildLoadModel(schema)
+    val tableInfo = model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo
+    val properties =
+      tableInfo.getFactTable.getTableProperties
+    // Add the meta cache level
+    options.map{ case (key, value) =>
+      if (key.equalsIgnoreCase(CarbonCommonConstants.COLUMN_META_CACHE)) {
+        val columnsToBeCached = value.split(",").map(x => x.trim.toLowerCase).toSeq
+        // make the columns in create table order and then add it to table properties
+        val createOrder =
+          tableInfo.getFactTable.getListOfColumns.asScala.map(_.getColumnName).filter(
+            col => columnsToBeCached.contains(col))
+        properties.put(CarbonCommonConstants.COLUMN_META_CACHE, createOrder.mkString(","))
+      }
+    }
+    model
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
new file mode 100644
index 0000000..d321cab
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.mapreduce._
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryMode
+import org.apache.spark.sql._
+import org.apache.spark.sql.carbondata.execution.datasources.readsupport.SparkUnsafeRowReadSuport
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
+import org.apache.spark.sql.catalyst.util.ArrayData
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.util.SparkTypeConverter
+import org.apache.spark.util.SerializableConfiguration
+
+import org.apache.carbondata.common.annotations.{InterfaceAudience, InterfaceStability}
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.converter.SparkDataTypeConverterImpl
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.BlockletDetailInfo
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.{Expression => CarbonExpression}
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.core.statusmanager.{FileFormat => CarbonFileFormatVersion}
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection, CarbonRecordReader}
+import org.apache.carbondata.hadoop.api.{CarbonFileInputFormat, CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
+import org.apache.carbondata.processing.loading.complexobjects.{ArrayObject, StructObject}
+import org.apache.carbondata.spark.vectorreader.VectorizedCarbonRecordReader
+
+/**
+ * Used to read and write data stored in carbondata files to/from the spark execution engine.
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+class SparkCarbonFileFormat extends FileFormat
+  with DataSourceRegister
+  with Logging
+  with Serializable {
+
+  @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  /**
+   * If user does not provide schema while reading the data then spark calls this method to infer
+   * schema from the carbodata files. It reads the schema present in carbondata files and return it.
+   */
+  override def inferSchema(sparkSession: SparkSession,
+      options: Map[String, String],
+      files: Seq[FileStatus]): Option[StructType] = {
+    val tablePath = options.get("path") match {
+      case Some(path) => path
+      case _ => FileFactory.getUpdatedFilePath(files.head.getPath.getParent.toUri.toString)
+    }
+
+    val tableInfo = SchemaReader.inferSchema(AbsoluteTableIdentifier.from(tablePath, "", ""), false)
+    val table = CarbonTable.buildFromTableInfo(tableInfo)
+    var schema = new StructType
+    tableInfo.getFactTable.getListOfColumns.asScala.foreach { col =>
+      // TODO find better way to know its a child
+      if (!col.getColumnName.contains(".")) {
+        schema = schema.add(
+          col.getColumnName,
+          SparkTypeConverter.convertCarbonToSparkDataType(col, table))
+      }
+    }
+    Some(schema)
+  }
+
+  /**
+   * Prepares a write job and returns an [[OutputWriterFactory]].  Client side job preparation is
+   * done here.
+   */
+  override def prepareWrite(sparkSession: SparkSession,
+      job: Job,
+      options: Map[String, String],
+      dataSchema: StructType): OutputWriterFactory = {
+
+    val conf = job.getConfiguration
+
+    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+    model.setLoadWithoutConverterStep(true)
+    CarbonTableOutputFormat.setLoadModel(conf, model)
+
+    new OutputWriterFactory {
+      override def newInstance(
+          path: String,
+          dataSchema: StructType,
+          context: TaskAttemptContext): OutputWriter = {
+        val updatedPath = if (path.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+          new Path(path).getParent.toString
+        } else {
+          path
+        }
+        context.getConfiguration.set("carbon.outputformat.writepath", updatedPath)
+        context.getConfiguration.set("carbon.outputformat.taskno", System.nanoTime() + "")
+        new CarbonOutputWriter(path, context, dataSchema.fields)
+      }
+
+      override def getFileExtension(context: TaskAttemptContext): String = {
+        CarbonTablePath.CARBON_DATA_EXT
+      }
+    }
+  }
+
+  /**
+   * It is a just class to make compile between spark 2.1 and 2.2
+   */
+  private trait AbstractCarbonOutputWriter {
+    def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal")
+    def writeInternal(row: InternalRow): Unit = {
+      writeCarbon(row)
+    }
+    def write(row: InternalRow): Unit = {
+      writeCarbon(row)
+    }
+    def writeCarbon(row: InternalRow): Unit
+  }
+
+
+  /**
+   * Writer class for carbondata files
+   */
+  private class CarbonOutputWriter(path: String,
+      context: TaskAttemptContext,
+      fieldTypes: Array[StructField]) extends OutputWriter with AbstractCarbonOutputWriter {
+
+    private val writable = new ObjectArrayWritable
+
+    private val cutOffDate = Integer.MAX_VALUE >> 1
+
+    private val recordWriter: RecordWriter[NullWritable, ObjectArrayWritable] =
+      new CarbonTableOutputFormat().getRecordWriter(context)
+
+    /**
+     * Write sparks internal row to carbondata record writer
+     */
+    def writeCarbon(row: InternalRow): Unit = {
+      val data: Array[AnyRef] = extractData(row, fieldTypes)
+      writable.set(data)
+      recordWriter.write(NullWritable.get(), writable)
+    }
+
+    override def writeInternal(row: InternalRow): Unit = {
+      writeCarbon(row)
+    }
+
+    /**
+     * Convert the internal row to carbondata understandable object
+     */
+    private def extractData(row: InternalRow, fieldTypes: Array[StructField]): Array[AnyRef] = {
+      val data = new Array[AnyRef](fieldTypes.length)
+      var i = 0
+      while (i < fieldTypes.length) {
+        if (!row.isNullAt(i)) {
+          fieldTypes(i).dataType match {
+            case StringType =>
+              data(i) = row.getString(i)
+            case d: DecimalType =>
+              data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+            case s: StructType =>
+              data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+            case s: ArrayType =>
+              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+            case d: DateType =>
+              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+            case d: TimestampType =>
+              data(i) = (row.getLong(i) / 1000).asInstanceOf[AnyRef]
+            case other =>
+              data(i) = row.get(i, other)
+          }
+        } else {
+          setNull(fieldTypes(i).dataType, data, i)
+        }
+        i += 1
+      }
+      data
+    }
+
+    private def setNull(dataType: DataType, data: Array[AnyRef], i: Int) = {
+      dataType match {
+        case d: DateType =>
+          // 1  as treated as null in carbon
+          data(i) = 1.asInstanceOf[AnyRef]
+        case _ =>
+      }
+    }
+
+    /**
+     * Convert the internal row to carbondata understandable object
+     */
+    private def extractData(row: ArrayData, dataType: DataType): Array[AnyRef] = {
+      val data = new Array[AnyRef](row.numElements())
+      var i = 0
+      while (i < data.length) {
+        if (!row.isNullAt(i)) {
+          dataType match {
+            case d: DecimalType =>
+              data(i) = row.getDecimal(i, d.precision, d.scale).toJavaBigDecimal
+            case s: StructType =>
+              data(i) = new StructObject(extractData(row.getStruct(i, s.fields.length), s.fields))
+            case s: ArrayType =>
+              data(i) = new ArrayObject(extractData(row.getArray(i), s.elementType))
+            case d: DateType =>
+              data(i) = (row.getInt(i) + cutOffDate).asInstanceOf[AnyRef]
+            case other => data(i) = row.get(i, dataType)
+          }
+        } else {
+          setNull(dataType, data, i)
+        }
+        i += 1
+      }
+      data
+    }
+
+    override def close(): Unit = {
+      recordWriter.close(context)
+    }
+  }
+
+  override def shortName(): String = "carbon"
+
+  override def toString: String = "carbon"
+
+  override def hashCode(): Int = getClass.hashCode()
+
+  override def equals(other: Any): Boolean = other.isInstanceOf[SparkCarbonFileFormat]
+
+  /**
+   * Whether to support vector reader while reading data.
+   * In case of complex types it is not required to support it
+   */
+  private def supportVector(sparkSession: SparkSession, schema: StructType): Boolean = {
+    val vectorizedReader = {
+      if (sparkSession.sqlContext.sparkSession.conf
+        .contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) {
+        sparkSession.sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else {
+        CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+          CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+      }
+    }
+    vectorizedReader.toBoolean && schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
+
+  /**
+   * Returns whether this format support returning columnar batch or not.
+   */
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+    val conf = sparkSession.sessionState.conf
+    conf.wholeStageEnabled &&
+    schema.length <= conf.wholeStageMaxNumFields &&
+    schema.forall(_.dataType.isInstanceOf[AtomicType])
+  }
+
+  /**
+   * Returns a function that can be used to read a single carbondata file in as an
+   * Iterator of InternalRow.
+   */
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+      dataSchema: StructType,
+      partitionSchema: StructType,
+      requiredSchema: StructType,
+      filters: Seq[Filter],
+      options: Map[String, String],
+      hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
+    val filter: Option[CarbonExpression] = filters.flatMap { filter =>
+      CarbonSparkDataSourceUtil.createCarbonFilter(dataSchema, filter)
+    }.reduceOption(new AndExpression(_, _))
+
+    val projection = requiredSchema.map(_.name).toArray
+    val carbonProjection = new CarbonProjection
+    projection.foreach(carbonProjection.addColumn)
+
+    var supportBatchValue: Boolean = false
+
+    val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields)
+    val readVector = supportVector(sparkSession, resultSchema)
+    if (readVector) {
+      supportBatchValue = supportBatch(sparkSession, resultSchema)
+    }
+    val model = CarbonSparkDataSourceUtil.prepareLoadModel(options, dataSchema)
+    CarbonInputFormat
+      .setTableInfo(hadoopConf, model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
+    CarbonInputFormat.setTransactionalTable(hadoopConf, false)
+    CarbonInputFormat.setColumnProjection(hadoopConf, carbonProjection)
+    filter match {
+      case Some(c) => CarbonInputFormat.setFilterPredicates(hadoopConf, c)
+      case None => None
+    }
+    val format: CarbonFileInputFormat[Object] = new CarbonFileInputFormat[Object]
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    file: PartitionedFile => {
+      assert(file.partitionValues.numFields == partitionSchema.size)
+
+      if (file.filePath.endsWith(CarbonTablePath.CARBON_DATA_EXT)) {
+        val split = new CarbonInputSplit("null",
+          new Path(file.filePath),
+          file.start,
+          file.length,
+          file.locations,
+          CarbonFileFormatVersion.COLUMNAR_V3)
+        // It supports only from V3 version.
+        split.setVersion(ColumnarFormatVersion.V3)
+        val info = new BlockletDetailInfo()
+        split.setDetailInfo(info)
+        info.setBlockSize(file.length)
+        // Read the footer offset and set.
+        val reader = FileFactory.getFileHolder(FileFactory.getFileType(file.filePath))
+        val buffer = reader
+          .readByteBuffer(FileFactory.getUpdatedFilePath(file.filePath), file.length - 8, 8)
+        info.setBlockFooterOffset(buffer.getLong)
+        info.setVersionNumber(split.getVersion.number())
+        info.setUseMinMaxForPruning(true)
+        reader.finish()
+        val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
+        val hadoopAttemptContext =
+          new TaskAttemptContextImpl(broadcastedHadoopConf.value.value, attemptId)
+        val model = format.createQueryModel(split, hadoopAttemptContext)
+        model.setConverter(new SparkDataTypeConverterImpl)
+        val carbonReader = if (readVector) {
+          val vectorizedReader = new VectorizedCarbonRecordReader(model,
+            null,
+            supportBatchValue.toString)
+          vectorizedReader.initialize(split, hadoopAttemptContext)
+          vectorizedReader.initBatch(MemoryMode.ON_HEAP, partitionSchema, file.partitionValues)
+          logDebug(s"Appending $partitionSchema ${ file.partitionValues }")
+          vectorizedReader
+        } else {
+          val reader = new CarbonRecordReader(model,
+            new SparkUnsafeRowReadSuport(requiredSchema), null)
+          reader.initialize(split, hadoopAttemptContext)
+          reader
+        }
+
+        val iter = new RecordReaderIterator(carbonReader)
+        Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => iter.close()))
+
+        if (carbonReader.isInstanceOf[VectorizedCarbonRecordReader] && readVector) {
+          iter.asInstanceOf[Iterator[InternalRow]]
+        } else {
+          val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes
+          val joinedRow = new JoinedRow()
+          val appendPartitionColumns = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
+          if (partitionSchema.length == 0) {
+            // There is no partition columns
+            iter.asInstanceOf[Iterator[InternalRow]]
+          } else {
+            iter.asInstanceOf[Iterator[InternalRow]]
+              .map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+          }
+        }
+      }
+      else {
+        Iterator.empty
+      }
+    }
+  }
+
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala
new file mode 100644
index 0000000..cffde6c
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/readsupport/SparkUnsafeRowReadSuport.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.carbondata.execution.datasources.readsupport
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection}
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport
+
+/**
+ * Read support class which converts carbon row array format to sparks Internal row.
+ */
+class SparkUnsafeRowReadSuport(requiredSchema: StructType) extends CarbonReadSupport[InternalRow] {
+  private val unsafeProjection = UnsafeProjection.create(requiredSchema)
+  override def initialize(carbonColumns: Array[CarbonColumn],
+      carbonTable: CarbonTable): Unit = {
+  }
+
+  override def readRow(data: Array[AnyRef]): InternalRow = {
+    unsafeProjection(new GenericInternalRow(data.asInstanceOf[Array[Any]]))
+  }
+
+  override def close(): Unit = {
+    // Nothing to close
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/347b8e1d/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala
new file mode 100644
index 0000000..89eb622
--- /dev/null
+++ b/integration/spark-datasource/src/main/scala/org/apache/spark/sql/util/CarbonMetastoreTypes.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.util
+
+import scala.util.parsing.combinator.RegexParsers
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.types._
+
+object CarbonMetastoreTypes extends RegexParsers {
+  protected lazy val primitiveType: Parser[DataType] =
+    "string" ^^^ StringType |
+    "varchar" ^^^ StringType |
+    "float" ^^^ FloatType |
+    "int" ^^^ IntegerType |
+    "tinyint" ^^^ ShortType |
+    "short" ^^^ ShortType |
+    "double" ^^^ DoubleType |
+    "long" ^^^ LongType |
+    "binary" ^^^ BinaryType |
+    "boolean" ^^^ BooleanType |
+    fixedDecimalType |
+    "decimal" ^^^ "decimal" ^^^ DecimalType(10, 0) |
+    "varchar\\((\\d+)\\)".r ^^^ StringType |
+    "date" ^^^ DateType |
+    "timestamp" ^^^ TimestampType
+
+  protected lazy val fixedDecimalType: Parser[DataType] =
+    "decimal" ~> "(" ~> "^[1-9]\\d*".r ~ ("," ~> "^[0-9]\\d*".r <~ ")") ^^ {
+      case precision ~ scale =>
+        DecimalType(precision.toInt, scale.toInt)
+    }
+
+  protected lazy val arrayType: Parser[DataType] =
+    "array" ~> "<" ~> dataType <~ ">" ^^ {
+      case tpe => ArrayType(tpe)
+    }
+
+  protected lazy val mapType: Parser[DataType] =
+    "map" ~> "<" ~> dataType ~ "," ~ dataType <~ ">" ^^ {
+      case t1 ~ _ ~ t2 => MapType(t1, t2)
+    }
+
+  protected lazy val structField: Parser[StructField] =
+    "[a-zA-Z0-9_]*".r ~ ":" ~ dataType ^^ {
+      case name ~ _ ~ tpe => StructField(name, tpe, nullable = true)
+    }
+
+  protected lazy val structType: Parser[DataType] =
+    "struct" ~> "<" ~> repsep(structField, ",") <~ ">" ^^ {
+      case fields => StructType(fields)
+    }
+
+  protected lazy val dataType: Parser[DataType] =
+    arrayType |
+    mapType |
+    structType |
+    primitiveType
+
+  def toDataType(metastoreType: String): DataType = {
+    parseAll(dataType, metastoreType) match {
+      case Success(result, _) => result
+      case _: NoSuccess =>
+        throw new AnalysisException(s"Unsupported dataType: $metastoreType")
+    }
+  }
+
+  def toMetastoreType(dt: DataType): String = {
+    dt match {
+      case ArrayType(elementType, _) => s"array<${ toMetastoreType(elementType) }>"
+      case StructType(fields) =>
+        s"struct<${
+          fields.map(f => s"${ f.name }:${ toMetastoreType(f.dataType) }")
+            .mkString(",")
+        }>"
+      case StringType => "string"
+      case FloatType => "float"
+      case IntegerType => "int"
+      case ShortType => "tinyint"
+      case DoubleType => "double"
+      case LongType => "bigint"
+      case BinaryType => "binary"
+      case BooleanType => "boolean"
+      case DecimalType() => "decimal"
+      case TimestampType => "timestamp"
+      case DateType => "date"
+    }
+  }
+}