You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/09/14 16:26:21 UTC

[1/2] incubator-carbondata git commit: [CARBONDATA-212] Use SQLContext to read CarbonData files

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master e0d1bad52 -> 57ea8c5e8


[CARBONDATA-212] Use SQLContext to read CarbonData files


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/528c2bef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/528c2bef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/528c2bef

Branch: refs/heads/master
Commit: 528c2bef86bda2bb62dd29c53a229ebb515fe8a9
Parents: e0d1bad
Author: Jacky Li <ja...@huawei.com>
Authored: Thu Sep 15 00:23:12 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 15 00:23:12 2016 +0800

----------------------------------------------------------------------
 .../core/carbon/AbsoluteTableIdentifier.java    |  15 ++
 .../core/carbon/path/CarbonTablePath.java       |  10 +-
 .../carbondata/examples/DatasourceExample.scala |  54 ++++++++
 .../examples/util/InitForExamples.scala         |   4 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   9 +-
 .../carbondata/hadoop/util/SchemaReader.java    |  25 ++--
 .../spark/implicit/DataFrameFuncs.scala         | 122 ++++++++++++++++
 .../org/apache/carbondata/spark/package.scala   | 113 +--------------
 .../carbondata/spark/util/QueryPlanUtil.scala   |  12 ++
 .../sql/CarbonDatasourceHadoopRelation.scala    | 138 +++++++++++--------
 .../spark/sql/CarbonDatasourceRelation.scala    |  82 +++++------
 .../org/apache/spark/sql/CarbonOperators.scala  |   6 +-
 .../dataload/SparkDatasourceSuite.scala         |  88 ++++++++++--
 .../HadoopFSRelationTestCase.scala              |   4 +-
 14 files changed, 437 insertions(+), 245 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
index c8f603a..0e1481d 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
@@ -63,6 +63,21 @@ public class AbsoluteTableIdentifier implements Serializable {
     return carbonTableIdentifier;
   }
 
+  public static AbsoluteTableIdentifier fromTablePath(String tablePath) {
+    String[] names = tablePath.replace('\\', '/').split("/");
+    if (names.length < 3) {
+      throw new IllegalArgumentException("invalid table path: " + tablePath);
+    }
+
+    String tableName = names[names.length - 1];
+    String dbName = names[names.length - 2];
+    String storePath = tablePath.substring(0, tablePath.lastIndexOf(dbName));
+
+    CarbonTableIdentifier identifier =
+        new CarbonTableIdentifier(dbName, tableName, Long.toString(System.currentTimeMillis()));
+    return new AbsoluteTableIdentifier(storePath, identifier);
+  }
+
   /**
    * to get the hash code
    */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index 80a39f1..99531e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -30,7 +30,6 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.INVALID
 
 import org.apache.hadoop.fs.Path;
 
-
 /**
  * Helps to get Table content paths.
  */
@@ -185,6 +184,15 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * return the schema file path
+   * @param tablePath path to table files
+   * @return schema file path
+   */
+  public static String getSchemaFilePath(String tablePath) {
+    return tablePath + File.separator + METADATA_DIR + File.separator + SCHEMA_FILE;
+  }
+
+  /**
    * @return absolute path of table status file
    */
   public String getTableStatusFilePath() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
new file mode 100644
index 0000000..f19dc13
--- /dev/null
+++ b/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import org.apache.spark.sql.{SaveMode, SQLContext}
+
+import org.apache.carbondata.examples.util.InitForExamples
+
+object DatasourceExample {
+
+  def main(args: Array[String]) {
+    // use CarbonContext to write CarbonData files
+    val cc = InitForExamples.createCarbonContext("DatasourceExample")
+    import cc.implicits._
+    val sc = cc.sparkContext
+    // create a dataframe, it can be from parquet or hive table
+    val df = sc.parallelize(1 to 1000)
+               .map(x => ("a", "b", x))
+               .toDF("c1", "c2", "c3")
+
+    // save dataframe to CarbonData files
+    df.write
+      .format("carbondata")
+      .option("tableName", "table1")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    // Use SQLContext to read CarbonData files
+    val sqlContext = new SQLContext(sc)
+    sqlContext.sql(
+      """
+        | CREATE TEMPORARY TABLE source
+        | USING org.apache.spark.sql.CarbonSource
+        | OPTIONS (path './examples/target/store/default/table1')
+      """.stripMargin)
+    sqlContext.sql("SELECT c1, c2, count(*) FROM source WHERE c3 > 100 GROUP BY c1, c2").show
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
index 21377d7..46d2bc1 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
@@ -35,8 +35,8 @@ object InitForExamples {
 
   def createCarbonContext(appName: String): CarbonContext = {
     val sc = new SparkContext(new SparkConf()
-      .setAppName(appName)
-      .setMaster("local[2]"))
+          .setAppName(appName)
+          .setMaster("local[2]"))
     sc.setLogLevel("ERROR")
 
     println(s"Starting $appName using spark version ${sc.version}")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index da697ad..f6fae6c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -149,9 +149,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
   public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
     String carbonTableStr = configuration.get(CARBON_TABLE);
     if (carbonTableStr == null) {
-      CarbonTable carbonTable = new SchemaReader()
-          .readCarbonTableFromStore(getTablePath(configuration), getTableToAccess(configuration),
-              getStorePathString(configuration));
+      // read it from schema file in the store
+      String storePath = configuration.get(INPUT_DIR, "");
+      AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(
+          storePath, getTableToAccess(configuration));
+      CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(
+          getTablePath(configuration), identifier);
       setCarbonTable(configuration, carbonTable);
       return carbonTable;
     }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index f108b0a..e8c088f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.hadoop.util;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
 import org.apache.carbondata.core.carbon.metadata.converter.SchemaConverter;
@@ -33,16 +34,17 @@ import org.apache.carbondata.core.reader.ThriftReader;
 import org.apache.thrift.TBase;
 
 /**
- * TODO: It should be removed after store manager implemetation.
+ * TODO: It should be removed after store manager implementation.
  */
 public class SchemaReader {
 
-  public CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath,
-      CarbonTableIdentifier tableIdentifier, String storePath) throws IOException {
+  public static CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath,
+      AbsoluteTableIdentifier identifier) throws IOException {
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
-    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS)
-          || FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
-      String tableName = tableIdentifier.getTableName();
+    if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+        FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
+      String tableName = identifier.getCarbonTableIdentifier().getTableName();
 
       ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
         public TBase create() {
@@ -58,12 +60,15 @@ public class SchemaReader {
 
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
       TableInfo wrapperTableInfo = schemaConverter
-          .fromExternalToWrapperTableInfo(tableInfo, tableIdentifier.getDatabaseName(), tableName,
-              storePath);
+          .fromExternalToWrapperTableInfo(tableInfo,
+              identifier.getCarbonTableIdentifier().getDatabaseName(), tableName,
+              identifier.getStorePath());
       wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath));
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
-      return CarbonMetadata.getInstance().getCarbonTable(tableIdentifier.getTableUniqueName());
+      return CarbonMetadata.getInstance().getCarbonTable(
+          identifier.getCarbonTableIdentifier().getTableUniqueName());
+    } else {
+      return null;
     }
-    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
new file mode 100644
index 0000000..d5185cc
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonContext, DataFrame, SaveMode}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
+
+class DataFrameFuncs(dataFrame: DataFrame) {
+
+  /**
+   * Saves DataFrame as CarbonData files.
+   */
+  def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    // To avoid derby problem, dataframe need to be writen and read using CarbonContext
+    require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
+      "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
+    )
+
+    val options = new CarbonOption(parameters)
+    val tableName = options.tableName
+    val dbName = options.dbName
+
+    // temporary solution: write to csv file, then load the csv into carbon
+    val tempCSVFolder = s"./tempCSV"
+    dataFrame.write
+        .format(csvPackage)
+        .option("header", "true")
+        .mode(SaveMode.Overwrite)
+        .save(tempCSVFolder)
+
+    val sqlContext = dataFrame.sqlContext
+    val tempCSVPath = new Path(tempCSVFolder)
+    val fs = tempCSVPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+    try {
+      sqlContext.sql(makeCreateTableString(dataFrame.schema, options))
+
+      // add 'csv' as file extension to all generated part file
+      val itor = fs.listFiles(tempCSVPath, true)
+      while (itor.hasNext) {
+        val f = itor.next()
+        if (f.getPath.getName.startsWith("part-")) {
+          val newPath = s"${ f.getPath.getParent }/${ f.getPath.getName }.csv"
+          if (!fs.rename(f.getPath, new Path(newPath))) {
+            sqlContext.sql(s"DROP TABLE IF EXISTS $dbName.$tableName")
+            throw new RuntimeException("File system rename failed when loading data into carbon")
+          }
+        }
+      }
+      sqlContext.sql(makeLoadString(tempCSVFolder, options))
+    } finally {
+      fs.delete(tempCSVPath, true)
+    }
+  }
+
+  private def csvPackage: String = "com.databricks.spark.csv"
+
+  private def convertToCarbonType(sparkType: DataType): String = {
+    sparkType match {
+      case StringType => CarbonType.STRING.name
+      case IntegerType => CarbonType.INT.name
+      case ByteType => CarbonType.INT.name
+      case ShortType => CarbonType.SHORT.name
+      case LongType => CarbonType.LONG.name
+      case FloatType => CarbonType.DOUBLE.name
+      case DoubleType => CarbonType.DOUBLE.name
+      case BooleanType => CarbonType.DOUBLE.name
+      case TimestampType => CarbonType.TIMESTAMP.name
+      case other => sys.error(s"unsupported type: $other")
+    }
+  }
+
+  private def makeCreateTableString(schema: StructType, option: CarbonOption): String = {
+    val tableName = option.tableName
+    val dbName = option.dbName
+    val carbonSchema = schema.map { field =>
+      s"${ field.name } ${ convertToCarbonType(field.dataType) }"
+    }
+    s"""
+          CREATE TABLE IF NOT EXISTS $dbName.$tableName
+          (${ carbonSchema.mkString(", ") })
+          STORED BY '${ CarbonContext.datasourceName }'
+      """
+  }
+
+  private def makeLoadString(csvFolder: String, option: CarbonOption): String = {
+    val tableName = option.tableName
+    val dbName = option.dbName
+    s"""
+          LOAD DATA INPATH '$csvFolder'
+          INTO TABLE $dbName.$tableName
+      """
+  }
+
+  def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+    // find out table
+    // find out streaming segment
+    // for each rdd partition, find out the appendable carbon file
+    // check whether it is full
+    // if full, create new file
+    // append to it: create blocklet header and data, call thrift to convert, write hdfs
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
index b46af53..b1b91e7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
@@ -17,116 +17,11 @@
 
 package org.apache.carbondata
 
-import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.sql._
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.DataFrame
 
-import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
-
-package object spark extends Logging {
-
-  implicit class toCarbonDataFrame(dataFrame: DataFrame) {
-
-    /**
-     * Saves DataFrame as carbon files.
-     */
-    def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-      // To avoid derby problem, dataframe need to be writen and read using CarbonContext
-      require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
-        "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
-      )
-
-      val storePath = dataFrame.sqlContext.asInstanceOf[CarbonContext].storePath
-      val options = new CarbonOption(parameters)
-      val dbName = options.dbName
-      val tableName = options.tableName
-
-      // temporary solution: write to csv file, then load the csv into carbon
-      val tempCSVFolder = s"$storePath/$dbName/$tableName/tempCSV"
-      var writer: DataFrameWriter =
-        dataFrame.write
-                 .format(csvPackage)
-                 .option("header", "false")
-                 .mode(SaveMode.Overwrite)
-
-      if (options.compress.equals("true")) {
-        writer = writer.option("codec", "gzip")
-      }
-
-      writer.save(tempCSVFolder)
-
-      val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
-      val tempCSVPath = new Path(tempCSVFolder)
-      val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
-      def countSize(): Double = {
-        var size: Double = 0
-        val itor = fs.listFiles(tempCSVPath, true)
-        while (itor.hasNext) {
-          val f = itor.next()
-          if (f.getPath.getName.startsWith("part-")) {
-            size += f.getLen
-          }
-        }
-        size
-      }
-
-      try {
-        cc.sql(makeCreateTableString(dataFrame.schema, options))
-        logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
-        cc.sql(makeLoadString(tableName, tempCSVFolder))
-      } finally {
-        fs.delete(tempCSVPath, true)
-      }
-    }
-
-    private def csvPackage: String = "com.databricks.spark.csv.newapi"
-
-    private def convertToCarbonType(sparkType: DataType): String = {
-      sparkType match {
-        case StringType => CarbonType.STRING.name
-        case IntegerType => CarbonType.INT.name
-        case ByteType => CarbonType.INT.name
-        case ShortType => CarbonType.SHORT.name
-        case LongType => CarbonType.LONG.name
-        case FloatType => CarbonType.DOUBLE.name
-        case DoubleType => CarbonType.DOUBLE.name
-        case BooleanType => CarbonType.DOUBLE.name
-        case TimestampType => CarbonType.TIMESTAMP.name
-        case other => sys.error(s"unsupported type: $other")
-      }
-    }
-
-    private def makeCreateTableString(schema: StructType, option: CarbonOption): String = {
-      val tableName = option.tableName
-      val carbonSchema = schema.map { field =>
-        s"${ field.name } ${ convertToCarbonType(field.dataType) }"
-      }
-      s"""
-          CREATE TABLE IF NOT EXISTS $tableName
-          (${ carbonSchema.mkString(", ") })
-          STORED BY '${ CarbonContext.datasourceName }'
-      """
-    }
-
-    private def makeLoadString(tableName: String, csvFolder: String): String = {
-      s"""
-          LOAD DATA INPATH '$csvFolder'
-          INTO TABLE $tableName
-          OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
-      """
-    }
-
-    def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
-      // find out table
-      // find out streaming segment
-      // for each rdd partition, find out the appendable carbon file
-      // check whether it is full
-      // if full, create new file
-      // append to it: create blocklet header and data, call thrift to convert, write hdfs
-    }
+package object spark {
 
+  implicit def toDataFrameFuncs(dataFrame: DataFrame): DataFrameFuncs = {
+    new DataFrameFuncs(dataFrame)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 84b4c29..4dfcd3f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.util
 
+import scala.reflect.ClassTag
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
@@ -26,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
 import org.apache.carbondata.hadoop.CarbonInputFormat
 
+
 /**
  * All the utility functions for carbon plan creation
  */
@@ -44,4 +47,13 @@ object QueryPlanUtil {
       absoluteTableIdentifier.getCarbonTableIdentifier)
     (carbonInputFormat, job)
   }
+
+  def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
+      conf: Configuration) : CarbonInputFormat[V] = {
+    val carbonInputFormat = new CarbonInputFormat[V]()
+    val job: Job = new Job(conf)
+    FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getStorePath))
+    CarbonInputFormat.setTableToAccess(conf, absoluteTableIdentifier.getCarbonTableIdentifier)
+    carbonInputFormat
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index cf82072..7a8397c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -20,75 +20,89 @@ package org.apache.spark.sql
 import java.text.SimpleDateFormat
 import java.util.Date
 
-import scala.reflect.ClassTag
-
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.scan.expression.logical.AndExpression
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
+import org.apache.carbondata.spark.util.QueryPlanUtil
+import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, JobID}
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.{Job, JobID}
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, DistributionUtil, TableMeta}
+import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
 import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.scan.expression.logical.AndExpression
-import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
-import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
-import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
+import scala.reflect.ClassTag
 
-private[sql] case class CarbonDatasourceHadoopRelation(sqlContext: SQLContext,
+private[sql] case class CarbonDatasourceHadoopRelation(
+    sqlContext: SQLContext,
     paths: Array[String],
-    parameters: Map[String, String])
-  extends HadoopFsRelation {
-
-  val (carbonRelation, jobConf) = {
-    val options = new CarbonOption(parameters)
-    val job: Job = new Job(new JobConf())
-    FileInputFormat.setInputPaths(job, paths.head)
-    val identifier = new CarbonTableIdentifier(options.dbName, options.tableName, options.tableId)
-    CarbonInputFormat.setTableToAccess(job.getConfiguration, identifier)
-    val table = CarbonInputFormat.getCarbonTable(job.getConfiguration)
-    if(table == null) {
-      sys.error(s"Store path ${paths.head} is not valid or " +
-                s"table ${identifier.getTableUniqueName}  does not exist in path.")
+    parameters: Map[String, String],
+    tableSchema: Option[StructType])
+    extends HadoopFsRelation {
+
+  lazy val schemaPath = new Path(CarbonTablePath.getSchemaFilePath(paths.head))
+  if (!schemaPath.getFileSystem(new Configuration).exists(schemaPath)) {
+    throw new IllegalArgumentException("invalid CarbonData file path: " + paths.head)
+  }
+
+  lazy val job = new Job(new JobConf())
+  lazy val options = new CarbonOption(parameters)
+  lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
+  lazy val identifier = absIdentifier.getCarbonTableIdentifier
+  lazy val relationRaw: CarbonRelation = {
+    val carbonTable = SchemaReader.readCarbonTableFromStore(
+      CarbonStorePath.getCarbonTablePath(absIdentifier.getStorePath, identifier),
+      absIdentifier)
+    if (carbonTable == null) {
+      sys.error(s"CarbonData file path ${paths.head} is not valid")
     }
-    val relation = CarbonRelation(table.getDatabaseName,
-      table.getFactTableName,
-      CarbonSparkUtil.createSparkMeta(table),
+    CarbonRelation(
+      carbonTable.getDatabaseName,
+      carbonTable.getFactTableName,
+      CarbonSparkUtil.createSparkMeta(carbonTable),
       TableMeta(identifier,
         paths.head,
-        table,
+        carbonTable,
         Partitioner(options.partitionClass,
           Array(""),
           options.partitionCount.toInt,
-          DistributionUtil.getNodeList(sqlContext.sparkContext))),
-      None)(sqlContext)
-
-    (relation, job.getConfiguration)
+          DistributionUtil.getNodeList(sqlContext.sparkContext)
+        )
+      ),
+      None
+    )(sqlContext)
   }
 
-  def dataSchema: StructType = carbonRelation.schema
+  override def dataSchema: StructType = tableSchema.getOrElse(relationRaw.schema)
 
   override def prepareJobForWrite(job: Job): OutputWriterFactory = {
-    // TODO: implement it
+    // TODO
     throw new UnsupportedOperationException
   }
 
-  override def buildScan(requiredColumns: Array[String],
+  override def buildScan(
+      requiredColumns: Array[String],
       filters: Array[Filter],
       inputFiles: Array[FileStatus]): RDD[Row] = {
-    val conf = new Configuration(jobConf)
-    filters.flatMap(f => CarbonFilters.createCarbonFilter(dataSchema, f))
-      .reduceOption(new AndExpression(_, _))
-      .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+    val conf = new Configuration(job.getConfiguration)
+    filters.flatMap { filter =>
+        CarbonFilters.createCarbonFilter(dataSchema, filter)
+      }.reduceOption(new AndExpression(_, _))
+        .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+
     val projection = new CarbonProjection
     requiredColumns.foreach(projection.addColumn)
     CarbonInputFormat.setColumnProjection(projection, conf)
@@ -96,14 +110,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(sqlContext: SQLContext,
 
     new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
       new SerializableConfiguration(conf),
+      absIdentifier,
       classOf[CarbonInputFormat[Row]],
-      classOf[Row])
+      classOf[Row]
+    )
   }
+
 }
 
 class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
     val carbonSplit: SerializableWritable[CarbonInputSplit])
-  extends Partition {
+    extends Partition {
 
   override val index: Int = idx
 
@@ -113,40 +130,32 @@ class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
 class CarbonHadoopFSRDD[V: ClassTag](
     @transient sc: SparkContext,
     conf: SerializableConfiguration,
+    identifier: AbsoluteTableIdentifier,
     inputFormatClass: Class[_ <: CarbonInputFormat[V]],
     valueClass: Class[V])
-  extends RDD[V](sc, Nil)
-    with SparkHadoopMapReduceUtil
-    with Logging {
+    extends RDD[V](sc, Nil)
+        with SparkHadoopMapReduceUtil
+        with Logging {
 
   private val jobTrackerId: String = {
     val formatter = new SimpleDateFormat("yyyyMMddHHmm")
     formatter.format(new Date())
   }
-
   @transient protected val jobId = new JobID(jobTrackerId, id)
 
-  override protected def getPartitions: Array[Partition] = {
-    val inputFormat = inputFormatClass.newInstance
-    val jobContext = newJobContext(conf.value, jobId)
-    val splits = inputFormat.getSplits(jobContext).toArray
-    val carbonInputSplits = splits
-      .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
-    carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
-  }
-
   @DeveloperApi
   override def compute(split: Partition,
       context: TaskContext): Iterator[V] = {
     val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
     val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
-    val inputFormat = inputFormatClass.newInstance
+    val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
+      hadoopAttemptContext.getConfiguration)
+    hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getStorePath)
     val reader =
       inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
         hadoopAttemptContext)
-    reader
-      .initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
-        hadoopAttemptContext)
+    reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
+      hadoopAttemptContext)
     new Iterator[V] {
       private[this] var havePair = false
       private[this] var finished = false
@@ -174,4 +183,15 @@ class CarbonHadoopFSRDD[V: ClassTag](
       }
     }
   }
+
+  override protected def getPartitions: Array[Partition] = {
+    val jobContext = newJobContext(conf.value, jobId)
+    val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
+      jobContext.getConfiguration)
+    jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getStorePath)
+    val splits = carbonInputFormat.getSplits(jobContext).toArray
+    val carbonInputSplits = splits
+        .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
+    carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 6240c7c..f219ef5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -42,11 +42,8 @@ import org.apache.carbondata.spark.{CarbonOption, _}
  * Carbon relation provider compliant to data source api.
  * Creates carbon relations
  */
-class CarbonSource
-  extends RelationProvider
-    with CreatableRelationProvider
-    with HadoopFsRelationProvider
-    with DataSourceRegister {
+class CarbonSource extends RelationProvider
+    with CreatableRelationProvider with HadoopFsRelationProvider with DataSourceRegister {
 
   override def shortName(): String = "carbondata"
 
@@ -61,7 +58,7 @@ class CarbonSource
     // if path is provided we can directly create Hadoop relation. \
     // Otherwise create datasource relation
     parameters.get("path") match {
-      case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters)
+      case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters, None)
       case _ =>
         val options = new CarbonOption(parameters)
         val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
@@ -92,7 +89,7 @@ class CarbonSource
     val storePath = CarbonContext.getInstance(sqlContext.sparkContext).storePath
     val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-      .exists(tablePath)
+        .exists(tablePath)
     val (doSave, doAppend) = (mode, isExists) match {
       case (SaveMode.ErrorIfExists, true) =>
         sys.error(s"ErrorIfExists mode, path $storePath already exists.")
@@ -123,24 +120,24 @@ class CarbonSource
       dataSchema: Option[StructType],
       partitionColumns: Option[StructType],
       parameters: Map[String, String]): HadoopFsRelation = {
-    CarbonDatasourceHadoopRelation(sqlContext, paths, parameters)
+    CarbonDatasourceHadoopRelation(sqlContext, paths, parameters, dataSchema)
   }
 }
 
 /**
- * Creates carbon relation compliant to data source api.
+ *  Creates carbon relation compliant to data source api.
  * This relation is stored to hive metastore
  */
 private[sql] case class CarbonDatasourceRelation(
     tableIdentifier: TableIdentifier,
     alias: Option[String])
-  (@transient context: SQLContext)
-  extends BaseRelation with Serializable with Logging {
+    (@transient context: SQLContext)
+    extends BaseRelation with Serializable with Logging {
 
   def carbonRelation: CarbonRelation = {
     CarbonEnv.getInstance(context)
-      .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
-      .asInstanceOf[CarbonRelation]
+        .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
+        .asInstanceOf[CarbonRelation]
   }
 
   def schema: StructType = carbonRelation.schema
@@ -159,16 +156,16 @@ case class CarbonRelation(
     metaData: CarbonMetaData,
     tableMeta: TableMeta,
     alias: Option[String])(@transient sqlContext: SQLContext)
-  extends LeafNode with MultiInstanceRelation {
+    extends LeafNode with MultiInstanceRelation {
 
   def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
     childDim.getDataType.toString.toLowerCase match {
       case "array" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:array<${ getArrayChildren(childDim.getColName) }>"
+        childDim.getColName.substring(dimName.length + 1)
+      }:array<${ getArrayChildren(childDim.getColName) }>"
       case "struct" => s"${
-          childDim.getColName.substring(dimName.length + 1)
-        }:struct<${ getStructChildren(childDim.getColName) }>"
+        childDim.getColName.substring(dimName.length + 1)
+      }:struct<${ getStructChildren(childDim.getColName) }>"
       case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
     }
   }
@@ -195,31 +192,31 @@ case class CarbonRelation(
             .asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
         }>"
         case dType => s"${ childDim.getColName
-          .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+            .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
       }
     }).mkString(",")
   }
 
   override def newInstance(): LogicalPlan = {
     CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)(sqlContext)
-      .asInstanceOf[this.type]
+        .asInstanceOf[this.type]
   }
 
   val dimensionsAttr = {
     val sett = new LinkedHashSet(
       tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
-        .asScala.asJava)
+          .asScala.asJava)
     sett.asScala.toSeq.filter(!_.getColumnSchema.isInvisible).map(dim => {
       val dimval = metaData.carbonTable
-        .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+          .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
       val output: DataType = dimval.getDataType
-        .toString.toLowerCase match {
-        case "array" => CarbonMetastoreTypes
-          .toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
-        case "struct" => CarbonMetastoreTypes
-          .toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
+          .toString.toLowerCase match {
+        case "array" =>
+          CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
+        case "struct" =>
+          CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
         case dType =>
-          var dataType = addDecimalScaleAndPrecision(dimval, dType)
+          val dataType = addDecimalScaleAndPrecision(dimval, dType)
           CarbonMetastoreTypes.toDataType(dataType)
       }
 
@@ -234,17 +231,17 @@ case class CarbonRelation(
     val factTable = tableMeta.carbonTable.getFactTableName
     new LinkedHashSet(
       tableMeta.carbonTable.
-        getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
-        asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
+          getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
+          asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
         .map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
-        metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
-          .toLowerCase match {
-          case "int" => "long"
-          case "short" => "long"
-          case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
-          case others => others
-        }),
-      nullable = true)(qualifiers = tableName +: alias.toSeq))
+          metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
+              .toLowerCase match {
+            case "int" => "long"
+            case "short" => "long"
+            case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
+            case others => others
+          }),
+          nullable = true)(qualifiers = tableName +: alias.toSeq))
   }
 
   override val output = dimensionsAttr ++ measureAttr
@@ -265,8 +262,7 @@ case class CarbonRelation(
     if (dimval.getDataType
         == org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) {
       dType +=
-        "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema
-          .getScale + ")"
+          "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
     }
     dType
   }
@@ -277,8 +273,8 @@ case class CarbonRelation(
 
   def sizeInBytes: Long = {
     val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
-        tableMeta.carbonTable.getAbsoluteTableIdentifier)
-          .getTableStatusLastModifiedTime
+      tableMeta.carbonTable.getAbsoluteTableIdentifier)
+        .getTableStatusLastModifiedTime
     if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
       val tablePath = CarbonStorePath.getCarbonTablePath(
         tableMeta.storePath,
@@ -294,5 +290,3 @@ case class CarbonRelation(
 
 }
 
-
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index c2e85d1..c105cae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -148,9 +148,9 @@ case class CarbonScan(
     buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
 
     val tableCreationTime = carbonCatalog
-      .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
+        .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
     val schemaLastUpdatedTime = carbonCatalog
-      .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
+        .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
     val big = new CarbonScanRDD(
       ocRaw.sparkContext,
       model,
@@ -189,5 +189,3 @@ case class CarbonScan(
   }
 
 }
-
-

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
index 73aa2b0..ebcaf75 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
@@ -21,9 +21,9 @@ package org.apache.carbondata.integration.spark.testsuite.dataload
 
 import java.io.File
 
-import org.apache.spark.sql.{Row, DataFrame, SaveMode}
 import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
@@ -40,23 +40,32 @@ class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
     df = sc.parallelize(1 to 1000)
         .map(x => ("a", "b", x))
         .toDF("c1", "c2", "c3")
-  }
 
-  test("read and write using CarbonContext") {
     // save dataframe to carbon file
     df.write
         .format("carbondata")
         .option("tableName", "carbon1")
         .mode(SaveMode.Overwrite)
         .save()
+  }
 
+  test("read and write using CarbonContext") {
     val in = read
         .format("carbondata")
         .option("tableName", "carbon1")
         .load()
 
     assert(in.where("c3 > 500").count() == 500)
-    sql("DROP TABLE IF EXISTS carbon1")
+  }
+
+  test("read and write using CarbonContext with compression") {
+    val in = read
+        .format("carbondata")
+        .option("tableName", "carbon1")
+        .option("compress", "true")
+        .load()
+
+    assert(in.where("c3 > 500").count() == 500)
   }
 
   test("saveAsCarbon API") {
@@ -64,18 +73,75 @@ class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
     df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
 
     checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
-    sql("DROP TABLE IF EXISTS carbon2")
   }
 
-  test("saveAsCarbon API using compression") {
-    import org.apache.carbondata.spark._
-    df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true"))
+  test("query using SQLContext") {
+    val sqlContext = new SQLContext(sparkContext)
+    sqlContext.sql(
+      s"""
+         | CREATE TEMPORARY TABLE temp
+         | (c1 string, c2 string, c3 long)
+         | USING org.apache.spark.sql.CarbonSource
+         | OPTIONS (path '$storePath/default/carbon1')
+      """.stripMargin)
+    checkAnswer(sqlContext.sql(
+      """
+        | SELECT c1, c2, count(*)
+        | FROM temp
+        | WHERE c3 > 100
+        | GROUP BY c1, c2
+      """.stripMargin), Seq(Row("a", "b", 900)))
+    sqlContext.dropTempTable("temp")
+  }
 
-    checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
-    sql("DROP TABLE IF EXISTS carbon2")
+  test("query using SQLContext without providing schema") {
+    val sqlContext = new SQLContext(sparkContext)
+    sqlContext.sql(
+      s"""
+         | CREATE TEMPORARY TABLE temp
+         | USING org.apache.spark.sql.CarbonSource
+         | OPTIONS (path '$storePath/default/carbon1')
+      """.stripMargin)
+    checkAnswer(sqlContext.sql(
+      """
+        | SELECT c1, c2, count(*)
+        | FROM temp
+        | WHERE c3 > 100
+        | GROUP BY c1, c2
+      """.stripMargin), Seq(Row("a", "b", 900)))
+    sqlContext.dropTempTable("temp")
   }
 
-  override def afterAll {
+  test("query using SQLContext, multiple load") {
+    sql("DROP TABLE IF EXISTS test")
+    sql(
+      """
+        | CREATE TABLE test(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val testData = currentDirectory + "/src/test/resources/sample.csv"
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
+
+    val sqlContext = new SQLContext(sparkContext)
+    sqlContext.sql(
+      s"""
+         | CREATE TEMPORARY TABLE temp
+         | (id long, name string, city string, age long)
+         | USING org.apache.spark.sql.CarbonSource
+         | OPTIONS (path '$storePath/default/test')
+      """.stripMargin)
+    checkAnswer(sqlContext.sql(
+      """
+        | SELECT count(id)
+        | FROM temp
+      """.stripMargin), Seq(Row(8)))
+    sqlContext.dropTempTable("temp")
+    sql("DROP TABLE test")
+  }
 
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS carbon1")
+    sql("DROP TABLE IF EXISTS carbon2")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
index e28ddc0..0838a2b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
@@ -49,13 +49,13 @@ class HadoopFSRelationTestCase extends QueryTest with BeforeAndAfterAll {
 
   test("hadoopfsrelation select all test") {
     val rdd = read.format("org.apache.spark.sql.CarbonSource")
-      .option("tableName", "hadoopfsrelation").load("./target/test")
+      .option("tableName", "hadoopfsrelation").load()
     assert(rdd.collect().length > 0)
   }
 
   test("hadoopfsrelation filters test") {
     val rdd: DataFrame = read.format("org.apache.spark.sql.CarbonSource")
-      .option("tableName", "hadoopfsrelation").load("./target/test")
+      .option("tableName", "hadoopfsrelation").load()
       .select("empno", "empname", "utilization").where("empname in ('arvind','ayushi')")
     checkAnswer(
       rdd,


[2/2] incubator-carbondata git commit: [CARBONDATA-212] Use SQLContext to read CarbonData files This closes #126

Posted by ch...@apache.org.
[CARBONDATA-212] Use SQLContext to read CarbonData files This closes #126


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/57ea8c5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/57ea8c5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/57ea8c5e

Branch: refs/heads/master
Commit: 57ea8c5e880668d029ad89df2e9e4153ae141506
Parents: e0d1bad 528c2be
Author: chenliang613 <ch...@apache.org>
Authored: Thu Sep 15 00:25:52 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 15 00:25:52 2016 +0800

----------------------------------------------------------------------
 .../core/carbon/AbsoluteTableIdentifier.java    |  15 ++
 .../core/carbon/path/CarbonTablePath.java       |  10 +-
 .../carbondata/examples/DatasourceExample.scala |  54 ++++++++
 .../examples/util/InitForExamples.scala         |   4 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   9 +-
 .../carbondata/hadoop/util/SchemaReader.java    |  25 ++--
 .../spark/implicit/DataFrameFuncs.scala         | 122 ++++++++++++++++
 .../org/apache/carbondata/spark/package.scala   | 113 +--------------
 .../carbondata/spark/util/QueryPlanUtil.scala   |  12 ++
 .../sql/CarbonDatasourceHadoopRelation.scala    | 138 +++++++++++--------
 .../spark/sql/CarbonDatasourceRelation.scala    |  82 +++++------
 .../org/apache/spark/sql/CarbonOperators.scala  |   6 +-
 .../dataload/SparkDatasourceSuite.scala         |  88 ++++++++++--
 .../HadoopFSRelationTestCase.scala              |   4 +-
 14 files changed, 437 insertions(+), 245 deletions(-)
----------------------------------------------------------------------