You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2016/11/30 17:46:51 UTC

[5/6] incubator-carbondata git commit: add spark2 module

add spark2 module

rebase

rebase

rebase

rebase

carbon api for spark2

fix late decoder

fix

comment on path

merged caiqiang's fix for decode rule

added test cases

fix style

fix pom

fix 1.5

1.5 example issue

fix 1.5 testsuite

fix 1.5 tests

imports


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/590ecceb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/590ecceb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/590ecceb

Branch: refs/heads/master
Commit: 590ecceb148dfe19ca3f16766e64dedf3646cf59
Parents: d94b99f
Author: jackylk <ja...@huawei.com>
Authored: Wed Nov 30 16:28:34 2016 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Thu Dec 1 01:45:05 2016 +0800

----------------------------------------------------------------------
 conf/dataload.properties.template               |   73 ++
 core/pom.xml                                    |    7 -
 .../examples/GenerateDictionaryExample.scala    |    7 +-
 examples/spark2/src/main/resources/data.csv     |   11 +
 .../carbondata/examples/CarbonExample.scala     |  109 ++
 hadoop/pom.xml                                  |    7 -
 .../carbondata/hadoop/CarbonProjection.java     |    5 +-
 .../AbstractDictionaryDecodedReadSupport.java   |    2 +
 integration-testcases/pom.xml                   |    8 -
 integration/spark-common/pom.xml                |   10 +-
 .../spark/merger/CarbonCompactionUtil.java      |    1 -
 .../readsupport/SparkRowReadSupportImpl.java    |    9 +-
 .../spark/rdd/CarbonDataLoadRDD.scala           |    3 +-
 .../spark/rdd/DataLoadPartitionCoalescer.scala  |    2 +-
 .../carbondata/spark/util/CommonUtil.scala      |    3 +-
 .../spark/util/GlobalDictionaryUtil.scala       |    2 +-
 .../CarbonTableIdentifierImplicit.scala         |    2 -
 .../spark/sql/hive/DistributionUtil.scala       |   17 +-
 integration/spark/pom.xml                       |    7 -
 .../spark/sql/CarbonDatasourceRelation.scala    |    1 +
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   10 +-
 .../scala/org/apache/spark/sql/CarbonScan.scala |    1 -
 .../org/apache/spark/sql/CarbonSparkUtil.scala  |    3 +-
 integration/spark2/pom.xml                      |    5 +
 .../spark/load/DeleteLoadFromMetadata.java      |   44 +
 .../carbondata/spark/util/CarbonQueryUtil.java  |  248 ++++
 .../spark/CarbonColumnValidator.scala           |   36 +
 .../apache/carbondata/spark/CarbonFilters.scala |  391 ++++++
 .../apache/carbondata/spark/CarbonOption.scala  |   48 +
 .../carbondata/spark/CarbonSparkFactory.scala   |   60 +
 .../spark/DictionaryDetailHelper.scala          |   62 +
 .../org/apache/carbondata/spark/KeyVal.scala    |   89 ++
 .../spark/rdd/CarbonDataRDDFactory.scala        | 1115 ++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |  250 ++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  305 +++++
 .../carbondata/spark/util/CarbonSparkUtil.scala |   45 +
 .../carbondata/spark/util/QueryPlanUtil.scala   |   56 +
 .../apache/spark/repl/CarbonSparkILoop.scala    |   72 ++
 .../spark/sql/CarbonCatalystOperators.scala     |   98 ++
 .../spark/sql/CarbonDataFrameWriter.scala       |  168 +++
 .../sql/CarbonDatasourceHadoopRelation.scala    |   78 ++
 .../spark/sql/CarbonDictionaryDecoder.scala     |  222 ++++
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |   59 +
 .../scala/org/apache/spark/sql/CarbonScan.scala |   11 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  143 +++
 .../spark/sql/SparkUnknownExpression.scala      |  130 ++
 .../org/apache/spark/sql/TableCreator.scala     |  490 ++++++++
 .../execution/CarbonLateDecodeStrategy.scala    |  128 +-
 .../execution/command/carbonTableSchema.scala   |  750 ++++++++++++
 .../spark/sql/hive/CarbonHiveMetadataUtil.scala |   56 +
 .../apache/spark/sql/hive/CarbonMetastore.scala |  803 +++++++++++++
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  796 +++++++++++++
 .../org/apache/spark/util/CleanFiles.scala      |   46 +
 .../org/apache/spark/util/Compaction.scala      |   46 +
 .../apache/spark/util/DeleteSegmentByDate.scala |   47 +
 .../apache/spark/util/DeleteSegmentById.scala   |   52 +
 .../org/apache/spark/util/ShowSegments.scala    |   82 ++
 .../org/apache/spark/util/TableAPIUtil.scala    |   54 +
 .../org/apache/spark/util/TableLoader.scala     |   93 ++
 .../carbondata/CarbonDataSourceSuite.scala      |   70 ++
 pom.xml                                         |   30 +-
 processing/pom.xml                              |    7 -
 .../lcm/status/SegmentStatusManager.java        |    7 +-
 63 files changed, 7402 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/conf/dataload.properties.template
----------------------------------------------------------------------
diff --git a/conf/dataload.properties.template b/conf/dataload.properties.template
new file mode 100644
index 0000000..59cad4a
--- /dev/null
+++ b/conf/dataload.properties.template
@@ -0,0 +1,73 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+#carbon store path
+# you should change to the code path of your local machine
+carbon.storelocation=/Users/wangfei/code/incubator-carbondata/examples/spark2/target/store
+
+#true: use kettle to load data
+#false: use new flow to load data
+use_kettle=true
+
+# you should change to the code path of your local machine
+carbon.kettle.home=/Users/wangfei/code/incubator-carbondata/processing/carbonplugins
+
+#csv delimiter character
+delimiter=,
+
+#csv quote character
+#quotechar=\"
+
+#csv file header
+#fileheader=
+
+#csv data escape character
+#escapechar=\\
+
+#csv comment character
+#commentchar=#
+
+#column dictionary list
+#columndict=
+
+#null value's serialization format  
+#serialization_null_format=\\N
+
+#bad records logger
+#bad_records_logger_enable=false
+
+#bad records action
+#bad_records_action=force
+
+#all dictionary folder path
+#all_dictionary_path=
+
+#complex column's level 1 delimiter
+#complex_delimiter_level_1=\\$
+
+#complex column's level 2 delimiter
+#complex_delimiter_level_2=\\:
+
+#timestamp type column's data format
+#dateformat=
+
+#csv data whether support multiline
+#multiline=false
+
+#max number of csv file columns
+#maxColumns=

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 4f6d223..c2f0b9a 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -33,13 +33,6 @@
     <dev.path>${basedir}/../dev</dev.path>
   </properties>
 
-  <repositories>
-    <repository>
-      <id>pentaho-releases</id>
-      <url>http://repository.pentaho.org/artifactory/repo/</url>
-    </repository>
-  </repositories>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
index e8c437d..0489020 100644
--- a/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GenerateDictionaryExample.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.examples
 import org.apache.spark.sql.{CarbonContext, CarbonEnv, CarbonRelation}
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -63,9 +63,8 @@ object GenerateDictionaryExample {
                       dictFolderPath: String) {
     val dataBaseName = carbonTableIdentifier.getDatabaseName
     val tableName = carbonTableIdentifier.getTableName
-    val carbonRelation = CarbonEnv.get.carbonMetastore.
-      lookupRelation1(Option(dataBaseName),
-        tableName) (cc).asInstanceOf[CarbonRelation]
+    val carbonRelation = CarbonEnv.get.carbonMetastore.lookupRelation1(Option(dataBaseName),
+        tableName)(cc).asInstanceOf[CarbonRelation]
     val carbonTable = carbonRelation.tableMeta.carbonTable
     val dimensions = carbonTable.getDimensionByTableName(tableName.toLowerCase())
       .toArray.map(_.asInstanceOf[CarbonDimension])

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/examples/spark2/src/main/resources/data.csv
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/resources/data.csv b/examples/spark2/src/main/resources/data.csv
new file mode 100644
index 0000000..5d3169e
--- /dev/null
+++ b/examples/spark2/src/main/resources/data.csv
@@ -0,0 +1,11 @@
+shortField,intField,bigintField,doubleField,stringField,timestampField
+1, 10, 100, 48.4, spark, 2015/4/23
+5, 17, 140, 43.4, spark, 2015/7/27
+1, 11, 100, 44.4, flink, 2015/5/23
+1, 10, 150, 43.4, spark, 2015/7/24
+1, 10, 100, 47.4, spark, 2015/7/23
+3, 14, 160, 43.4, hive, 2015/7/26
+2, 10, 100, 43.4, impala, 2015/7/23
+1, 10, 100, 43.4, spark, 2015/5/23
+4, 16, 130, 42.4, impala, 2015/7/23
+1, 10, 100, 43.4, spark, 2015/7/23

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
new file mode 100644
index 0000000..9102c78
--- /dev/null
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonExample.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.examples
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.TableLoader
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+object CarbonExample {
+
+  def main(args: Array[String]): Unit = {
+    // to run the example, plz change this path to your local machine path
+    val rootPath = "/Users/wangfei/code/incubator-carbondata"
+    val spark = SparkSession
+        .builder()
+        .master("local")
+        .appName("CarbonExample")
+        .enableHiveSupport()
+        .config(CarbonCommonConstants.STORE_LOCATION,
+          s"$rootPath/examples/spark2/target/store")
+        .getOrCreate()
+    spark.sparkContext.setLogLevel("WARN")
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+    spark.sql("DROP TABLE IF EXISTS csv_table")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbon_table(
+         |    shortField short,
+         |    intField int,
+         |    bigintField long,
+         |    doubleField double,
+         |    stringField string
+         | )
+         | USING org.apache.spark.sql.CarbonSource
+       """.stripMargin)
+
+    val prop = s"$rootPath/conf/dataload.properties.template"
+    val tableName = "carbon_table"
+    val path = s"$rootPath/examples/spark2/src/main/resources/data.csv"
+    TableLoader.main(Array[String](prop, tableName, path))
+
+//    spark.sql(
+//      s"""
+//         | CREATE TABLE csv_table
+//         | (ID int,
+//         | date timestamp,
+//         | country string,
+//         | name string,
+//         | phonetype string,
+//         | serialname string,
+//         | salary int)
+//       """.stripMargin)
+//
+//    spark.sql(
+//      s"""
+//         | LOAD DATA LOCAL INPATH '$csvPath'
+//         | INTO TABLE csv_table
+//       """.stripMargin)
+
+//    spark.sql(
+//      s"""
+//         | INSERT INTO TABLE carbon_table
+//         | SELECT * FROM csv_table
+//       """.stripMargin)
+
+    // Perform a query
+//    spark.sql("""
+//           SELECT country, count(salary) AS amount
+//           FROM carbon_table
+//           WHERE country IN ('china','france')
+//           GROUP BY country
+//           """).show()
+
+    spark.sql("""
+             SELECT *
+             FROM carbon_table
+              """).show
+
+//    spark.sql("""
+//           SELECT sum(intField), stringField
+//           FROM carbon_table
+//           GROUP BY stringField
+//           """).show
+
+    // Drop table
+    spark.sql("DROP TABLE IF EXISTS carbon_table")
+    spark.sql("DROP TABLE IF EXISTS csv_table")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop/pom.xml b/hadoop/pom.xml
index 53061fb..b04c4c6 100644
--- a/hadoop/pom.xml
+++ b/hadoop/pom.xml
@@ -33,13 +33,6 @@
     <dev.path>${basedir}/../dev</dev.path>
   </properties>
 
-  <repositories>
-    <repository>
-      <id>pentaho-releases</id>
-      <url>http://repository.pentaho.org/artifactory/repo/</url>
-    </repository>
-  </repositories>
-
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
index 95afd2f..8cd539f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonProjection.java
@@ -18,13 +18,16 @@
  */
 package org.apache.carbondata.hadoop;
 
+import java.io.Serializable;
 import java.util.LinkedHashSet;
 import java.util.Set;
 
 /**
  * User can add required columns
  */
-public class CarbonProjection {
+public class CarbonProjection implements Serializable {
+
+  private static final long serialVersionUID = -4328676723039530713L;
 
   private Set<String> columns = new LinkedHashSet<>();
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
index fa8ba6e..5eda4d8 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java
@@ -70,6 +70,8 @@ public abstract class AbstractDictionaryDecodedReadSupport<T> implements CarbonR
         } catch (CarbonUtilException e) {
           throw new RuntimeException(e);
         }
+      } else {
+        dataTypes[i] = carbonColumns[i].getDataType();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration-testcases/pom.xml
----------------------------------------------------------------------
diff --git a/integration-testcases/pom.xml b/integration-testcases/pom.xml
index 2fbe41b..737e628 100644
--- a/integration-testcases/pom.xml
+++ b/integration-testcases/pom.xml
@@ -32,14 +32,6 @@
   <properties>
     <dev.path>${basedir}/../dev</dev.path>
   </properties>
-
-  <repositories>
-    <repository>
-      <id>pentaho-releases</id>
-      <url>http://repository.pentaho.org/artifactory/repo/</url>
-    </repository>
-  </repositories>
-
   <dependencies>
     <dependency>
       <groupId>com.databricks</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index b0ab3ef..d3f42b4 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -35,11 +35,6 @@
 
   <dependencies>
     <dependency>
-      <groupId>com.databricks</groupId>
-      <artifactId>spark-csv_${scala.binary.version}</artifactId>
-      <version>1.2.0</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-common</artifactId>
       <version>${project.version}</version>
@@ -84,6 +79,11 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.databricks</groupId>
+      <artifactId>spark-csv_${scala.binary.version}</artifactId>
+      <version>1.2.0</version>
+    </dependency>
+    <dependency>
       <groupId>org.scalatest</groupId>
       <artifactId>scalatest_${scala.binary.version}</artifactId>
       <version>2.2.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
index ca33fac..753f485 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonCompactionUtil.java
@@ -38,7 +38,6 @@ import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 
-
 /**
  * Utility Class for the Compaction Flow.
  */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
index bb8fc5c..4b1958d 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -57,12 +57,19 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor
             break;
           default:
         }
-      } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      }
+      else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         //convert the long to timestamp in case of direct dictionary column
         if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
           data[i] = new Timestamp((long) data[i] / 1000);
         }
       }
+//      else if(dataTypes[i].equals(DataType.INT)) {
+//        data[i] = ((Long)(data[i])).intValue();
+//      }
+//        else if(dataTypes[i].equals(DataType.SHORT)) {
+//        data[i] = ((Double)(data[i])).shortValue();
+//      }
     }
     return new GenericRow(data);
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 1d8d6b2..319d85c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -37,8 +37,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
 import org.apache.carbondata.processing.constants.DataProcessorConstants
-import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
-import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
+import org.apache.carbondata.processing.csvreaderstep.{JavaRddIterator, RddInputUtils}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator
 import org.apache.carbondata.processing.model.CarbonLoadModel

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
index af349a8..77402b4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataLoadPartitionCoalescer.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.rdd
 
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet}
 import scala.collection.mutable
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, LinkedHashSet}
 
 import org.apache.spark.Partition
 import org.apache.spark.scheduler.TaskLocation

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 1c9d774..6766a39 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -22,8 +22,7 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 
-import org.apache.spark.sql.execution.command.ColumnProperty
-import org.apache.spark.sql.execution.command.Field
+import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.lcm.status.SegmentStatusManager

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index e650bfe..e578488 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -35,11 +35,11 @@ import org.apache.spark.util.FileUtils
 import org.apache.carbondata.common.factory.CarbonCommonFactory
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.cache.dictionary.Dictionary
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier
 import org.apache.carbondata.core.carbon.metadata.datatype.DataType
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.carbon.path.CarbonStorePath
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.reader.CarbonDictionaryReader

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
index 79c0cc8..d607523 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst
 
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-
 /**
  * Implicit functions for [TableIdentifier]
  */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index e08660c..7368bff 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.spark.sql.hive
 
 import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
@@ -143,19 +144,15 @@ object DistributionUtil {
     nodes.distinct
   }
 
-  /**
-   * Requesting the extra executors other than the existing ones.
-   *
-   * @param sc
-   * @param numExecutors
-   * @return
-   */
+  // Hack for spark2 integration
+  var numExistingExecutors: Int = _
+
   def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
     sc.schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
-        val requiredExecutors = numExecutors - b.numExistingExecutors
-        LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
-            s"${ b.numExistingExecutors }")
+        val requiredExecutors = numExecutors - numExistingExecutors
+        LOGGER.info(s"number of executors is = $numExecutors existing executors are = " +
+                    s"$numExistingExecutors")
         if (requiredExecutors > 0) {
           b.requestExecutors(requiredExecutors)
         }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index 6476576..9492296 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -33,13 +33,6 @@
     <dev.path>${basedir}/../../dev</dev.path>
   </properties>
 
-  <repositories>
-    <repository>
-      <id>pentaho-releases</id>
-      <url>http://repository.pentaho.org/artifactory/repo/</url>
-    </repository>
-  </repositories>
-
   <dependencies>
     <dependency>
       <groupId>com.databricks</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index d898c4f..681c0c8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SchemaRDD
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.AttributeReference

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 6cfbd5f..c473253 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.hive.CarbonMetastore
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+import org.apache.spark.sql.hive.{CarbonMetastore, DistributionUtil}
 
-/**
- * Carbon Environment for unified context
- */
 case class CarbonEnv(carbonMetastore: CarbonMetastore)
 
 object CarbonEnv {
@@ -35,6 +33,10 @@ object CarbonEnv {
       val cc = sqlContext.asInstanceOf[CarbonContext]
       val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
       carbonEnv = CarbonEnv(catalog)
+      DistributionUtil.numExistingExecutors = sqlContext.sparkContext.schedulerBackend match {
+        case b: CoarseGrainedSchedulerBackend => b.numExistingExecutors
+        case _ => 0
+      }
       initialized = true
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 3fe4f22..848e752 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -22,7 +22,6 @@ import java.util.ArrayList
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
index 4320598..c199dba 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
@@ -31,7 +31,8 @@ object CarbonSparkUtil {
 
   def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
     val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
-        .asScala.map(x => x.getColName) // wf : may be problem
+        .asScala.map(x => x.getColName)
+    // wf : may be problem
     val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
         .asScala.map(x => x.getColName)
     val dictionary =

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 33e7ae7..ee56067 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -35,6 +35,11 @@
 
   <dependencies>
     <dependency>
+      <groupId>com.databricks</groupId>
+      <artifactId>spark-csv_${scala.binary.version}</artifactId>
+      <version>${spark.csv.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-common</artifactId>
       <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
new file mode 100644
index 0000000..0926e1c
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFromMetadata.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Project Name  : Carbon
+ * Module Name   : CARBON Data Processor
+ * Author    : R00903928
+ * Created Date  : 21-Sep-2015
+ * FileName   : DeleteLoadFromMetadata.java
+ * Description   : Kettle step to generate MD Key
+ * Class Version  : 1.0
+ */
+package org.apache.carbondata.spark.load;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+public final class DeleteLoadFromMetadata {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
+
+  private DeleteLoadFromMetadata() {
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
new file mode 100644
index 0000000..04ef665
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.util;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.Partition;
+import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer;
+import org.apache.carbondata.spark.partition.api.impl.PartitionMultiFileImpl;
+import org.apache.carbondata.spark.partition.api.impl.QueryPartitionHelper;
+import org.apache.carbondata.spark.splits.TableSplit;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * This utilty parses the Carbon query plan to actual query model object.
+ */
+public final class CarbonQueryUtil {
+
+  private CarbonQueryUtil() {
+
+  }
+
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName,
+      CarbonQueryPlan queryPlan) throws IOException {
+
+    //Just create splits depends on locations of region servers
+    List<Partition> allPartitions = null;
+    if (queryPlan == null) {
+      allPartitions =
+          QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName);
+    } else {
+      allPartitions =
+          QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan);
+    }
+    TableSplit[] splits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < splits.length; i++) {
+      splits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location =
+              QueryPartitionHelper.getInstance().getLocation(partition, databaseName, tableName);
+      locations.add(location);
+      splits[i].setPartition(partition);
+      splits[i].setLocations(locations);
+    }
+
+    return splits;
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception {
+
+    //Just create splits depends on locations of region servers
+    FileType fileType = FileFactory.getFileType(sourcePath);
+    DefaultLoadBalancer loadBalancer = null;
+    List<Partition> allPartitions = getAllFilesForDataLoad(sourcePath);
+    loadBalancer = new DefaultLoadBalancer(new ArrayList<String>(), allPartitions);
+    TableSplit[] tblSplits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < tblSplits.length; i++) {
+      tblSplits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location = loadBalancer.getNodeForPartitions(partition);
+      locations.add(location);
+      tblSplits[i].setPartition(partition);
+      tblSplits[i].setLocations(locations);
+    }
+    return tblSplits;
+  }
+
+  /**
+   * It creates the one split for each region server.
+   */
+  public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList,
+      int partitionCount) throws Exception {
+
+    //Just create splits depends on locations of region servers
+    FileType fileType = FileFactory.getFileType(sourcePath);
+    DefaultLoadBalancer loadBalancer = null;
+    List<Partition> allPartitions = getAllPartitions(sourcePath, fileType, partitionCount);
+    loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions);
+    TableSplit[] splits = new TableSplit[allPartitions.size()];
+    for (int i = 0; i < splits.length; i++) {
+      splits[i] = new TableSplit();
+      List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      Partition partition = allPartitions.get(i);
+      String location = loadBalancer.getNodeForPartitions(partition);
+      locations.add(location);
+      splits[i].setPartition(partition);
+      splits[i].setLocations(locations);
+    }
+    return splits;
+  }
+
+  public static void getAllFiles(String sourcePath, List<String> partitionsFiles, FileType fileType)
+      throws Exception {
+
+    if (!FileFactory.isFileExist(sourcePath, fileType, false)) {
+      throw new Exception("Source file doesn't exist at path: " + sourcePath);
+    }
+
+    CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType);
+    if (file.isDirectory()) {
+      CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile pathname) {
+          return true;
+        }
+      });
+      for (int i = 0; i < fileNames.length; i++) {
+        getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType);
+      }
+    } else {
+      // add only csv files
+      if (file.getName().endsWith("csv")) {
+        partitionsFiles.add(file.getPath());
+      }
+    }
+  }
+
+  /**
+   * split sourcePath by comma
+   */
+  public static void splitFilePath(String sourcePath, List<String> partitionsFiles,
+      String separator) {
+    if (StringUtils.isNotEmpty(sourcePath)) {
+      String[] files = sourcePath.split(separator);
+      for (String file : files) {
+        partitionsFiles.add(file);
+      }
+    }
+  }
+
+  private static List<Partition> getAllFilesForDataLoad(String sourcePath) throws Exception {
+    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
+    List<Partition> partitionList =
+        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    Map<Integer, List<String>> partitionFiles = new HashMap<Integer, List<String>>();
+
+    partitionFiles.put(0, new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN));
+    partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0)));
+
+    for (int i = 0; i < files.size(); i++) {
+      partitionFiles.get(i % 1).add(files.get(i));
+    }
+    return partitionList;
+  }
+
+  private static List<Partition> getAllPartitions(String sourcePath, FileType fileType,
+      int partitionCount) throws Exception {
+    List<String> files = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA);
+    int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount);
+    int startIndex = 0;
+    int endIndex = 0;
+    List<Partition> partitionList =
+        new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+    if (numberOfFilesPerPartition != null) {
+      for (int i = 0; i < numberOfFilesPerPartition.length; i++) {
+        List<String> partitionFiles =
+            new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+        endIndex += numberOfFilesPerPartition[i];
+        for (int j = startIndex; j < endIndex; j++) {
+          partitionFiles.add(files.get(j));
+        }
+        startIndex += numberOfFilesPerPartition[i];
+        partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles));
+      }
+    }
+    return partitionList;
+  }
+
+  private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) {
+    int div = numberOfFiles / partitionCount;
+    int mod = numberOfFiles % partitionCount;
+    int[] numberOfNodeToScan = null;
+    if (div > 0) {
+      numberOfNodeToScan = new int[partitionCount];
+      Arrays.fill(numberOfNodeToScan, div);
+    } else if (mod > 0) {
+      numberOfNodeToScan = new int[mod];
+    }
+    for (int i = 0; i < mod; i++) {
+      numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1;
+    }
+    return numberOfNodeToScan;
+  }
+
+  public static List<String> getListOfSlices(LoadMetadataDetails[] details) {
+    List<String> slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    if (null != details) {
+      for (LoadMetadataDetails oneLoad : details) {
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(oneLoad.getLoadStatus())) {
+          String loadName = CarbonCommonConstants.LOAD_FOLDER + oneLoad.getLoadName();
+          slices.add(loadName);
+        }
+      }
+    }
+    return slices;
+  }
+
+  /**
+   * This method will clear the dictionary cache for a given map of columns and dictionary cache
+   * mapping
+   *
+   * @param columnToDictionaryMap
+   */
+  public static void clearColumnDictionaryCache(Map<String, Dictionary> columnToDictionaryMap) {
+    for (Map.Entry<String, Dictionary> entry : columnToDictionaryMap.entrySet()) {
+      CarbonUtil.clearDictionaryCache(entry.getValue());
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
new file mode 100644
index 0000000..ea97bca
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+
+ /**
+  * Carbon column validator
+  */
+class CarbonColumnValidator extends ColumnValidator {
+  def validateColumns(allColumns: Seq[ColumnSchema]) {
+    allColumns.foreach { columnSchema =>
+      val colWithSameId = allColumns.filter { x =>
+        x.getColumnUniqueId.equals(columnSchema.getColumnUniqueId)
+      }
+      if (colWithSameId.size > 1) {
+        throw new MalformedCarbonCommandException("Two column can not have same columnId")
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
new file mode 100644
index 0000000..2cd4eb7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonFilters.scala
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper}
+import org.apache.spark.sql.sources
+import org.apache.spark.sql.types.StructType
+
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn
+import org.apache.carbondata.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
+import org.apache.carbondata.scan.expression.conditional._
+import org.apache.carbondata.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * All filter conversions are done here.
+ */
+object CarbonFilters {
+
+  /**
+   * Converts data sources filters to carbon filter predicates.
+   */
+  def createCarbonFilter(schema: StructType,
+      predicate: sources.Filter): Option[CarbonExpression] = {
+    val dataTypeOf = schema.map(f => f.name -> f.dataType).toMap
+
+    def createFilter(predicate: sources.Filter): Option[CarbonExpression] = {
+      predicate match {
+
+        case sources.EqualTo(name, value) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.Not(sources.EqualTo(name, value)) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+
+        case sources.EqualNullSafe(name, value) =>
+          Some(new EqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.Not(sources.EqualNullSafe(name, value)) =>
+          Some(new NotEqualsExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+
+        case sources.GreaterThan(name, value) =>
+          Some(new GreaterThanExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.LessThan(name, value) =>
+          Some(new LessThanExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.GreaterThanOrEqual(name, value) =>
+          Some(new GreaterThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+        case sources.LessThanOrEqual(name, value) =>
+          Some(new LessThanEqualToExpression(getCarbonExpression(name),
+            getCarbonLiteralExpression(name, value)))
+
+        case sources.In(name, values) =>
+          Some(new InExpression(getCarbonExpression(name),
+            new ListExpression(
+              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+        case sources.Not(sources.In(name, values)) =>
+          Some(new NotInExpression(getCarbonExpression(name),
+            new ListExpression(
+              convertToJavaList(values.map(f => getCarbonLiteralExpression(name, f)).toList))))
+
+        case sources.And(lhs, rhs) =>
+          (createFilter(lhs) ++ createFilter(rhs)).reduceOption(new AndExpression(_, _))
+
+        case sources.Or(lhs, rhs) =>
+          for {
+            lhsFilter <- createFilter(lhs)
+            rhsFilter <- createFilter(rhs)
+          } yield {
+            new OrExpression(lhsFilter, rhsFilter)
+          }
+
+        case _ => None
+      }
+    }
+
+    def getCarbonExpression(name: String) = {
+      new CarbonColumnExpression(name,
+        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+    }
+
+    def getCarbonLiteralExpression(name: String, value: Any): CarbonExpression = {
+      new CarbonLiteralExpression(value,
+        CarbonScalaUtil.convertSparkToCarbonDataType(dataTypeOf(name)))
+    }
+
+    createFilter(predicate)
+  }
+
+
+  // Check out which filters can be pushed down to carbon, remaining can be handled in spark layer.
+  // Mostly dimension filters are only pushed down since it is faster in carbon.
+  def selectFilters(filters: Seq[Expression],
+      attrList: java.util.HashSet[AttributeReferenceWrapper],
+      aliasMap: CarbonAliasDecoderRelation): Unit = {
+    def translate(expr: Expression, or: Boolean = false): Option[sources.Filter] = {
+      expr match {
+        case or@ Or(left, right) =>
+
+          val leftFilter = translate(left, or = true)
+          val rightFilter = translate(right, or = true)
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            Some( sources.Or(leftFilter.get, rightFilter.get))
+          } else {
+            or.collect {
+              case attr: AttributeReference =>
+                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+            None
+          }
+
+        case And(left, right) =>
+          (translate(left) ++ translate(right)).reduceOption(sources.And)
+
+        case EqualTo(a: Attribute, Literal(v, t)) =>
+          Some(sources.EqualTo(a.name, v))
+        case EqualTo(l@Literal(v, t), a: Attribute) =>
+          Some(sources.EqualTo(a.name, v))
+        case EqualTo(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.EqualTo(a.name, v))
+        case EqualTo(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.EqualTo(a.name, v))
+
+        case Not(EqualTo(a: Attribute, Literal(v, t))) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Literal(v, t), a: Attribute)) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => new
+            Some(sources.Not(sources.EqualTo(a.name, v)))
+        case IsNotNull(a: Attribute) => Some(sources.IsNotNull(a.name))
+        case IsNull(a: Attribute) => Some(sources.IsNull(a.name))
+        case Not(In(a: Attribute, list)) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.Not(sources.In(a.name, hSet.toArray)))
+        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.In(a.name, hSet.toArray))
+        case Not(In(Cast(a: Attribute, _), list))
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.Not(sources.In(a.name, hSet.toArray)))
+        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          val hSet = list.map(e => e.eval(EmptyRow))
+          Some(sources.In(a.name, hSet.toArray))
+
+        case GreaterThan(a: Attribute, Literal(v, t)) =>
+          Some(sources.GreaterThan(a.name, v))
+        case GreaterThan(Literal(v, t), a: Attribute) =>
+          Some(sources.LessThan(a.name, v))
+        case GreaterThan(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.GreaterThan(a.name, v))
+        case GreaterThan(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.LessThan(a.name, v))
+
+        case LessThan(a: Attribute, Literal(v, t)) =>
+          Some(sources.LessThan(a.name, v))
+        case LessThan(Literal(v, t), a: Attribute) =>
+          Some(sources.GreaterThan(a.name, v))
+        case LessThan(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.LessThan(a.name, v))
+        case LessThan(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.GreaterThan(a.name, v))
+
+        case GreaterThanOrEqual(a: Attribute, Literal(v, t)) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+        case GreaterThanOrEqual(Literal(v, t), a: Attribute) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+        case GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+        case GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+
+        case LessThanOrEqual(a: Attribute, Literal(v, t)) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+        case LessThanOrEqual(Literal(v, t), a: Attribute) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+        case LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) =>
+          Some(sources.LessThanOrEqual(a.name, v))
+        case LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(sources.GreaterThanOrEqual(a.name, v))
+
+        case others =>
+          if (!or) {
+            others.collect {
+              case attr: AttributeReference =>
+                attrList.add(AttributeReferenceWrapper(aliasMap.getOrElse(attr, attr)))
+            }
+          }
+          None
+      }
+    }
+    filters.flatMap(translate(_, false)).toArray
+  }
+
+  def processExpression(exprs: Seq[Expression],
+      attributesNeedToDecode: java.util.HashSet[AttributeReference],
+      unprocessedExprs: ArrayBuffer[Expression],
+      carbonTable: CarbonTable): Option[CarbonExpression] = {
+    def transformExpression(expr: Expression, or: Boolean = false): Option[CarbonExpression] = {
+      expr match {
+        case or@ Or(left, right) =>
+          val leftFilter = transformExpression(left, true)
+          val rightFilter = transformExpression(right, true)
+          if (leftFilter.isDefined && rightFilter.isDefined) {
+            Some(new OrExpression(leftFilter.get, rightFilter.get))
+          } else {
+            or.collect {
+              case attr: AttributeReference => attributesNeedToDecode.add(attr)
+            }
+            unprocessedExprs += or
+            None
+          }
+
+        case And(left, right) =>
+          (transformExpression(left) ++ transformExpression(right)).reduceOption(new
+              AndExpression(_, _))
+
+        case EqualTo(a: Attribute, l@Literal(v, t)) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+        case EqualTo(l@Literal(v, t), a: Attribute) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+        case EqualTo(Cast(a: Attribute, _), l@Literal(v, t)) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+        case EqualTo(l@Literal(v, t), Cast(a: Attribute, _)) => new
+            Some(new EqualToExpression(transformExpression(a).get, transformExpression(l).get))
+
+        case Not(EqualTo(a: Attribute, l@Literal(v, t))) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case Not(EqualTo(l@Literal(v, t), a: Attribute)) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case Not(EqualTo(Cast(a: Attribute, _), l@Literal(v, t))) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case Not(EqualTo(l@Literal(v, t), Cast(a: Attribute, _))) => new
+            Some(new NotEqualsExpression(transformExpression(a).get, transformExpression(l).get))
+        case IsNotNull(child: Attribute) =>
+            Some(new NotEqualsExpression(transformExpression(child).get,
+             transformExpression(Literal(null)).get, true))
+        case IsNull(child: Attribute) =>
+            Some(new EqualToExpression(transformExpression(child).get,
+             transformExpression(Literal(null)).get, true))
+        case Not(In(a: Attribute, list))
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get,
+              new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+          }
+        case In(a: Attribute, list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          Some(new InExpression(transformExpression(a).get,
+            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+        case Not(In(Cast(a: Attribute, _), list))
+          if !list.exists(!_.isInstanceOf[Literal]) =>
+          /* if any illogical expression comes in NOT IN Filter like
+           NOT IN('scala',NULL) this will be treated as false expression and will
+           always return no result. */
+          if (list.exists(x => isNullLiteral(x.asInstanceOf[Literal]))) {
+            Some(new FalseExpression(transformExpression(a).get))
+          } else {
+            Some(new NotInExpression(transformExpression(a).get, new ListExpression(
+              convertToJavaList(list.map(transformExpression(_).get)))))
+          }
+        case In(Cast(a: Attribute, _), list) if !list.exists(!_.isInstanceOf[Literal]) =>
+          Some(new InExpression(transformExpression(a).get,
+            new ListExpression(convertToJavaList(list.map(transformExpression(_).get)))))
+
+        case GreaterThan(a: Attribute, l@Literal(v, t)) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case GreaterThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case GreaterThan(l@Literal(v, t), a: Attribute) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case GreaterThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+
+        case LessThan(a: Attribute, l@Literal(v, t)) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case LessThan(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new LessThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case LessThan(l@Literal(v, t), a: Attribute) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+        case LessThan(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new GreaterThanExpression(transformExpression(a).get, transformExpression(l).get))
+
+        case GreaterThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case GreaterThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case GreaterThanOrEqual(l@Literal(v, t), a: Attribute) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case GreaterThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+
+        case LessThanOrEqual(a: Attribute, l@Literal(v, t)) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case LessThanOrEqual(Cast(a: Attribute, _), l@Literal(v, t)) =>
+          Some(new LessThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case LessThanOrEqual(l@Literal(v, t), a: Attribute) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+        case LessThanOrEqual(l@Literal(v, t), Cast(a: Attribute, _)) =>
+          Some(new GreaterThanEqualToExpression(transformExpression(a).get,
+            transformExpression(l).get))
+
+        case AttributeReference(name, dataType, _, _) =>
+          Some(new CarbonColumnExpression(name,
+            CarbonScalaUtil.convertSparkToCarbonDataType(
+              getActualCarbonDataType(name, carbonTable))))
+        case Literal(name, dataType) => Some(new
+            CarbonLiteralExpression(name, CarbonScalaUtil.convertSparkToCarbonDataType(dataType)))
+        case Cast(left, right) if !left.isInstanceOf[Literal] => transformExpression(left)
+        case others =>
+          if (!or) {
+            others.collect {
+              case attr: AttributeReference => attributesNeedToDecode.add(attr)
+            }
+            unprocessedExprs += others
+          }
+          None
+      }
+    }
+    exprs.flatMap(transformExpression(_, false)).reduceOption(new AndExpression(_, _))
+  }
+  private def isNullLiteral(exp: Expression): Boolean = {
+    if (null != exp
+        &&  exp.isInstanceOf[Literal]
+        && (exp.asInstanceOf[Literal].dataType == org.apache.spark.sql.types.DataTypes.NullType)
+        || (exp.asInstanceOf[Literal].value == null)) {
+      true
+    } else {
+      false
+    }
+  }
+  private def getActualCarbonDataType(column: String, carbonTable: CarbonTable) = {
+    var carbonColumn: CarbonColumn =
+      carbonTable.getDimensionByName(carbonTable.getFactTableName, column)
+    val dataType = if (carbonColumn != null) {
+      carbonColumn.getDataType
+    } else {
+      carbonColumn = carbonTable.getMeasureByName(carbonTable.getFactTableName, column)
+      carbonColumn.getDataType match {
+        case DataType.INT => DataType.LONG
+        case DataType.LONG => DataType.LONG
+        case DataType.DECIMAL => DataType.DECIMAL
+        case _ => DataType.DOUBLE
+      }
+    }
+    CarbonScalaUtil.convertCarbonToSparkDataType(dataType)
+  }
+
+  // Convert scala list to java list, Cannot use scalaList.asJava as while deserializing it is
+  // not able find the classes inside scala list and gives ClassNotFoundException.
+  private def convertToJavaList(
+      scalaList: Seq[CarbonExpression]): java.util.List[CarbonExpression] = {
+    val javaList = new java.util.ArrayList[CarbonExpression]()
+    scalaList.foreach(javaList.add)
+    javaList
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
new file mode 100644
index 0000000..5f0c7e3
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
+/**
+ * Contains all options for Spark data source
+ */
+class CarbonOption(options: Map[String, String]) {
+  def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
+
+  def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+
+  def tableName: String = options.getOrElse("tableName", "default_table")
+
+  def tableId: String = options.getOrElse("tableId", "default_table_id")
+
+  def tablePath: String = s"$dbName/$tableName"
+
+  def partitionCount: String = options.getOrElse("partitionCount", "1")
+
+  def partitionClass: String = {
+    options.getOrElse("partitionClass",
+      "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
+  }
+
+  def tempCSV: Boolean = options.getOrElse("tempCSV", "true").toBoolean
+
+  def compress: Boolean = options.getOrElse("compress", "false").toBoolean
+
+  def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
new file mode 100644
index 0000000..7618558
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonSparkFactory.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
+
+
+ /**
+  * Column validator
+  */
+trait ColumnValidator {
+  def validateColumns(columns: Seq[ColumnSchema])
+}
+/**
+ * Dictionary related helper service
+ */
+trait DictionaryDetailService {
+  def getDictionaryDetail(dictFolderPath: String, primDimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier, storePath: String): DictionaryDetail
+}
+
+/**
+ * Dictionary related detail
+ */
+case class DictionaryDetail(columnIdentifiers: Array[ColumnIdentifier],
+    dictFilePaths: Array[String], dictFileExists: Array[Boolean])
+
+/**
+ * Factory class
+ */
+object CarbonSparkFactory {
+   /**
+    * @return column validator
+    */
+  def getCarbonColumnValidator(): ColumnValidator = {
+    new CarbonColumnValidator
+  }
+
+  /**
+   * @return dictionary helper
+   */
+  def getDictionaryDetailService(): DictionaryDetailService = {
+    new DictionaryDetailHelper
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
new file mode 100644
index 0000000..52457b8
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/DictionaryDetailHelper.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark
+
+import scala.collection.mutable.HashMap
+
+import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.datastorage.store.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory
+
+class DictionaryDetailHelper extends DictionaryDetailService {
+  def getDictionaryDetail(dictfolderPath: String, primDimensions: Array[CarbonDimension],
+      table: CarbonTableIdentifier, storePath: String): DictionaryDetail = {
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, table)
+    val dictFilePaths = new Array[String](primDimensions.length)
+    val dictFileExists = new Array[Boolean](primDimensions.length)
+    val columnIdentifier = new Array[ColumnIdentifier](primDimensions.length)
+
+    val fileType = FileFactory.getFileType(dictfolderPath)
+    // Metadata folder
+    val metadataDirectory = FileFactory.getCarbonFile(dictfolderPath, fileType)
+    // need list all dictionary file paths with exists flag
+    val carbonFiles = metadataDirectory.listFiles(new CarbonFileFilter {
+      @Override def accept(pathname: CarbonFile): Boolean = {
+        CarbonTablePath.isDictionaryFile(pathname)
+      }
+    })
+    // 2 put dictionary file names to fileNamesMap
+    val fileNamesMap = new HashMap[String, Int]
+    for (i <- 0 until carbonFiles.length) {
+      fileNamesMap.put(carbonFiles(i).getName, i)
+    }
+    // 3 lookup fileNamesMap, if file name is in fileNamesMap, file is exists, or not.
+    primDimensions.zipWithIndex.foreach { f =>
+      columnIdentifier(f._2) = f._1.getColumnIdentifier
+      dictFilePaths(f._2) = carbonTablePath.getDictionaryFilePath(f._1.getColumnId)
+      dictFileExists(f._2) =
+        fileNamesMap.get(CarbonTablePath.getDictionaryFileName(f._1.getColumnId)) match {
+          case None => false
+          case Some(_) => true
+        }
+    }
+
+    DictionaryDetail(columnIdentifier, dictFilePaths, dictFileExists)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/590ecceb/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
new file mode 100644
index 0000000..254052b
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * It is just Key value class. I don't get any other alternate to make the RDD class to
+ * work with my minimum knowledge in scala.
+ * May be I will remove later once I gain good knowledge :)
+ *
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.carbondata.core.load.LoadMetadataDetails
+
+trait Value[V] extends Serializable {
+  def getValue(value: Array[Object]): V
+}
+
+class ValueImpl extends Value[Array[Object]] {
+  override def getValue(value: Array[Object]): Array[Object] = value
+}
+
+trait RawValue[V] extends Serializable {
+  def getValue(value: Array[Any]): V
+}
+
+class RawValueImpl extends RawValue[Array[Any]] {
+  override def getValue(value: Array[Any]): Array[Any] = value
+}
+
+trait DataLoadResult[K, V] extends Serializable {
+  def getKey(key: String, value: LoadMetadataDetails): (K, V)
+}
+
+class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
+  override def getKey(key: String, value: LoadMetadataDetails): (String, LoadMetadataDetails) = {
+    (key, value)
+  }
+}
+
+
+trait PartitionResult[K, V] extends Serializable {
+  def getKey(key: Int, value: Boolean): (K, V)
+
+}
+
+class PartitionResultImpl extends PartitionResult[Int, Boolean] {
+  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
+
+trait MergeResult[K, V] extends Serializable {
+  def getKey(key: Int, value: Boolean): (K, V)
+
+}
+
+class MergeResultImpl extends MergeResult[Int, Boolean] {
+  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}
+
+trait DeletedLoadResult[K, V] extends Serializable {
+  def getKey(key: String, value: String): (K, V)
+}
+
+class DeletedLoadResultImpl extends DeletedLoadResult[String, String] {
+  override def getKey(key: String, value: String): (String, String) = (key, value)
+}
+
+trait RestructureResult[K, V] extends Serializable {
+  def getKey(key: Int, value: Boolean): (K, V)
+}
+
+class RestructureResultImpl extends RestructureResult[Int, Boolean] {
+  override def getKey(key: Int, value: Boolean): (Int, Boolean) = (key, value)
+}