You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/09/14 16:26:21 UTC
[1/2] incubator-carbondata git commit: [CARBONDATA-212] Use
SQLContext to read CarbonData files
Repository: incubator-carbondata
Updated Branches:
refs/heads/master e0d1bad52 -> 57ea8c5e8
[CARBONDATA-212] Use SQLContext to read CarbonData files
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/528c2bef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/528c2bef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/528c2bef
Branch: refs/heads/master
Commit: 528c2bef86bda2bb62dd29c53a229ebb515fe8a9
Parents: e0d1bad
Author: Jacky Li <ja...@huawei.com>
Authored: Thu Sep 15 00:23:12 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 15 00:23:12 2016 +0800
----------------------------------------------------------------------
.../core/carbon/AbsoluteTableIdentifier.java | 15 ++
.../core/carbon/path/CarbonTablePath.java | 10 +-
.../carbondata/examples/DatasourceExample.scala | 54 ++++++++
.../examples/util/InitForExamples.scala | 4 +-
.../carbondata/hadoop/CarbonInputFormat.java | 9 +-
.../carbondata/hadoop/util/SchemaReader.java | 25 ++--
.../spark/implicit/DataFrameFuncs.scala | 122 ++++++++++++++++
.../org/apache/carbondata/spark/package.scala | 113 +--------------
.../carbondata/spark/util/QueryPlanUtil.scala | 12 ++
.../sql/CarbonDatasourceHadoopRelation.scala | 138 +++++++++++--------
.../spark/sql/CarbonDatasourceRelation.scala | 82 +++++------
.../org/apache/spark/sql/CarbonOperators.scala | 6 +-
.../dataload/SparkDatasourceSuite.scala | 88 ++++++++++--
.../HadoopFSRelationTestCase.scala | 4 +-
14 files changed, 437 insertions(+), 245 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
index c8f603a..0e1481d 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifier.java
@@ -63,6 +63,21 @@ public class AbsoluteTableIdentifier implements Serializable {
return carbonTableIdentifier;
}
+ public static AbsoluteTableIdentifier fromTablePath(String tablePath) {
+ String[] names = tablePath.replace('\\', '/').split("/");
+ if (names.length < 3) {
+ throw new IllegalArgumentException("invalid table path: " + tablePath);
+ }
+
+ String tableName = names[names.length - 1];
+ String dbName = names[names.length - 2];
+ String storePath = tablePath.substring(0, tablePath.lastIndexOf(dbName));
+
+ CarbonTableIdentifier identifier =
+ new CarbonTableIdentifier(dbName, tableName, Long.toString(System.currentTimeMillis()));
+ return new AbsoluteTableIdentifier(storePath, identifier);
+ }
+
/**
* to get the hash code
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
index 80a39f1..99531e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java
@@ -30,7 +30,6 @@ import static org.apache.carbondata.core.constants.CarbonCommonConstants.INVALID
import org.apache.hadoop.fs.Path;
-
/**
* Helps to get Table content paths.
*/
@@ -185,6 +184,15 @@ public class CarbonTablePath extends Path {
}
/**
+ * return the schema file path
+ * @param tablePath path to table files
+ * @return schema file path
+ */
+ public static String getSchemaFilePath(String tablePath) {
+ return tablePath + File.separator + METADATA_DIR + File.separator + SCHEMA_FILE;
+ }
+
+ /**
* @return absolute path of table status file
*/
public String getTableStatusFilePath() {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
new file mode 100644
index 0000000..f19dc13
--- /dev/null
+++ b/examples/src/main/scala/org/apache/carbondata/examples/DatasourceExample.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import org.apache.spark.sql.{SaveMode, SQLContext}
+
+import org.apache.carbondata.examples.util.InitForExamples
+
+object DatasourceExample {
+
+ def main(args: Array[String]) {
+ // use CarbonContext to write CarbonData files
+ val cc = InitForExamples.createCarbonContext("DatasourceExample")
+ import cc.implicits._
+ val sc = cc.sparkContext
+ // create a dataframe, it can be from parquet or hive table
+ val df = sc.parallelize(1 to 1000)
+ .map(x => ("a", "b", x))
+ .toDF("c1", "c2", "c3")
+
+ // save dataframe to CarbonData files
+ df.write
+ .format("carbondata")
+ .option("tableName", "table1")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ // Use SQLContext to read CarbonData files
+ val sqlContext = new SQLContext(sc)
+ sqlContext.sql(
+ """
+ | CREATE TEMPORARY TABLE source
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS (path './examples/target/store/default/table1')
+ """.stripMargin)
+ sqlContext.sql("SELECT c1, c2, count(*) FROM source WHERE c3 > 100 GROUP BY c1, c2").show
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
index 21377d7..46d2bc1 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
@@ -35,8 +35,8 @@ object InitForExamples {
def createCarbonContext(appName: String): CarbonContext = {
val sc = new SparkContext(new SparkConf()
- .setAppName(appName)
- .setMaster("local[2]"))
+ .setAppName(appName)
+ .setMaster("local[2]"))
sc.setLogLevel("ERROR")
println(s"Starting $appName using spark version ${sc.version}")
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index da697ad..f6fae6c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -149,9 +149,12 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
public static CarbonTable getCarbonTable(Configuration configuration) throws IOException {
String carbonTableStr = configuration.get(CARBON_TABLE);
if (carbonTableStr == null) {
- CarbonTable carbonTable = new SchemaReader()
- .readCarbonTableFromStore(getTablePath(configuration), getTableToAccess(configuration),
- getStorePathString(configuration));
+ // read it from schema file in the store
+ String storePath = configuration.get(INPUT_DIR, "");
+ AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(
+ storePath, getTableToAccess(configuration));
+ CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(
+ getTablePath(configuration), identifier);
setCarbonTable(configuration, carbonTable);
return carbonTable;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index f108b0a..e8c088f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.hadoop.util;
import java.io.IOException;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
import org.apache.carbondata.core.carbon.metadata.converter.SchemaConverter;
@@ -33,16 +34,17 @@ import org.apache.carbondata.core.reader.ThriftReader;
import org.apache.thrift.TBase;
/**
- * TODO: It should be removed after store manager implemetation.
+ * TODO: It should be removed after store manager implementation.
*/
public class SchemaReader {
- public CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath,
- CarbonTableIdentifier tableIdentifier, String storePath) throws IOException {
+ public static CarbonTable readCarbonTableFromStore(CarbonTablePath carbonTablePath,
+ AbsoluteTableIdentifier identifier) throws IOException {
String schemaFilePath = carbonTablePath.getSchemaFilePath();
- if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS)
- || FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
- String tableName = tableIdentifier.getTableName();
+ if (FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.LOCAL) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.HDFS) ||
+ FileFactory.isFileExist(schemaFilePath, FileFactory.FileType.VIEWFS)) {
+ String tableName = identifier.getCarbonTableIdentifier().getTableName();
ThriftReader.TBaseCreator createTBase = new ThriftReader.TBaseCreator() {
public TBase create() {
@@ -58,12 +60,15 @@ public class SchemaReader {
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
TableInfo wrapperTableInfo = schemaConverter
- .fromExternalToWrapperTableInfo(tableInfo, tableIdentifier.getDatabaseName(), tableName,
- storePath);
+ .fromExternalToWrapperTableInfo(tableInfo,
+ identifier.getCarbonTableIdentifier().getDatabaseName(), tableName,
+ identifier.getStorePath());
wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath));
CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
- return CarbonMetadata.getInstance().getCarbonTable(tableIdentifier.getTableUniqueName());
+ return CarbonMetadata.getInstance().getCarbonTable(
+ identifier.getCarbonTableIdentifier().getTableUniqueName());
+ } else {
+ return null;
}
- return null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
new file mode 100644
index 0000000..d5185cc
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.{CarbonContext, DataFrame, SaveMode}
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
+
+class DataFrameFuncs(dataFrame: DataFrame) {
+
+ /**
+ * Saves DataFrame as CarbonData files.
+ */
+ def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+ // To avoid derby problem, dataframe need to be writen and read using CarbonContext
+ require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
+ "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
+ )
+
+ val options = new CarbonOption(parameters)
+ val tableName = options.tableName
+ val dbName = options.dbName
+
+ // temporary solution: write to csv file, then load the csv into carbon
+ val tempCSVFolder = s"./tempCSV"
+ dataFrame.write
+ .format(csvPackage)
+ .option("header", "true")
+ .mode(SaveMode.Overwrite)
+ .save(tempCSVFolder)
+
+ val sqlContext = dataFrame.sqlContext
+ val tempCSVPath = new Path(tempCSVFolder)
+ val fs = tempCSVPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+ try {
+ sqlContext.sql(makeCreateTableString(dataFrame.schema, options))
+
+ // add 'csv' as file extension to all generated part file
+ val itor = fs.listFiles(tempCSVPath, true)
+ while (itor.hasNext) {
+ val f = itor.next()
+ if (f.getPath.getName.startsWith("part-")) {
+ val newPath = s"${ f.getPath.getParent }/${ f.getPath.getName }.csv"
+ if (!fs.rename(f.getPath, new Path(newPath))) {
+ sqlContext.sql(s"DROP TABLE IF EXISTS $dbName.$tableName")
+ throw new RuntimeException("File system rename failed when loading data into carbon")
+ }
+ }
+ }
+ sqlContext.sql(makeLoadString(tempCSVFolder, options))
+ } finally {
+ fs.delete(tempCSVPath, true)
+ }
+ }
+
+ private def csvPackage: String = "com.databricks.spark.csv"
+
+ private def convertToCarbonType(sparkType: DataType): String = {
+ sparkType match {
+ case StringType => CarbonType.STRING.name
+ case IntegerType => CarbonType.INT.name
+ case ByteType => CarbonType.INT.name
+ case ShortType => CarbonType.SHORT.name
+ case LongType => CarbonType.LONG.name
+ case FloatType => CarbonType.DOUBLE.name
+ case DoubleType => CarbonType.DOUBLE.name
+ case BooleanType => CarbonType.DOUBLE.name
+ case TimestampType => CarbonType.TIMESTAMP.name
+ case other => sys.error(s"unsupported type: $other")
+ }
+ }
+
+ private def makeCreateTableString(schema: StructType, option: CarbonOption): String = {
+ val tableName = option.tableName
+ val dbName = option.dbName
+ val carbonSchema = schema.map { field =>
+ s"${ field.name } ${ convertToCarbonType(field.dataType) }"
+ }
+ s"""
+ CREATE TABLE IF NOT EXISTS $dbName.$tableName
+ (${ carbonSchema.mkString(", ") })
+ STORED BY '${ CarbonContext.datasourceName }'
+ """
+ }
+
+ private def makeLoadString(csvFolder: String, option: CarbonOption): String = {
+ val tableName = option.tableName
+ val dbName = option.dbName
+ s"""
+ LOAD DATA INPATH '$csvFolder'
+ INTO TABLE $dbName.$tableName
+ """
+ }
+
+ def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
+ // find out table
+ // find out streaming segment
+ // for each rdd partition, find out the appendable carbon file
+ // check whether it is full
+ // if full, create new file
+ // append to it: create blocklet header and data, call thrift to convert, write hdfs
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
index b46af53..b1b91e7 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
@@ -17,116 +17,11 @@
package org.apache.carbondata
-import org.apache.hadoop.fs.Path
-import org.apache.spark.Logging
-import org.apache.spark.sql._
-import org.apache.spark.sql.types._
+import org.apache.spark.sql.DataFrame
-import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
-
-package object spark extends Logging {
-
- implicit class toCarbonDataFrame(dataFrame: DataFrame) {
-
- /**
- * Saves DataFrame as carbon files.
- */
- def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
- // To avoid derby problem, dataframe need to be writen and read using CarbonContext
- require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
- "Error in saving dataframe to carbon file, must use CarbonContext to save dataframe"
- )
-
- val storePath = dataFrame.sqlContext.asInstanceOf[CarbonContext].storePath
- val options = new CarbonOption(parameters)
- val dbName = options.dbName
- val tableName = options.tableName
-
- // temporary solution: write to csv file, then load the csv into carbon
- val tempCSVFolder = s"$storePath/$dbName/$tableName/tempCSV"
- var writer: DataFrameWriter =
- dataFrame.write
- .format(csvPackage)
- .option("header", "false")
- .mode(SaveMode.Overwrite)
-
- if (options.compress.equals("true")) {
- writer = writer.option("codec", "gzip")
- }
-
- writer.save(tempCSVFolder)
-
- val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
- val tempCSVPath = new Path(tempCSVFolder)
- val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
- def countSize(): Double = {
- var size: Double = 0
- val itor = fs.listFiles(tempCSVPath, true)
- while (itor.hasNext) {
- val f = itor.next()
- if (f.getPath.getName.startsWith("part-")) {
- size += f.getLen
- }
- }
- size
- }
-
- try {
- cc.sql(makeCreateTableString(dataFrame.schema, options))
- logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
- cc.sql(makeLoadString(tableName, tempCSVFolder))
- } finally {
- fs.delete(tempCSVPath, true)
- }
- }
-
- private def csvPackage: String = "com.databricks.spark.csv.newapi"
-
- private def convertToCarbonType(sparkType: DataType): String = {
- sparkType match {
- case StringType => CarbonType.STRING.name
- case IntegerType => CarbonType.INT.name
- case ByteType => CarbonType.INT.name
- case ShortType => CarbonType.SHORT.name
- case LongType => CarbonType.LONG.name
- case FloatType => CarbonType.DOUBLE.name
- case DoubleType => CarbonType.DOUBLE.name
- case BooleanType => CarbonType.DOUBLE.name
- case TimestampType => CarbonType.TIMESTAMP.name
- case other => sys.error(s"unsupported type: $other")
- }
- }
-
- private def makeCreateTableString(schema: StructType, option: CarbonOption): String = {
- val tableName = option.tableName
- val carbonSchema = schema.map { field =>
- s"${ field.name } ${ convertToCarbonType(field.dataType) }"
- }
- s"""
- CREATE TABLE IF NOT EXISTS $tableName
- (${ carbonSchema.mkString(", ") })
- STORED BY '${ CarbonContext.datasourceName }'
- """
- }
-
- private def makeLoadString(tableName: String, csvFolder: String): String = {
- s"""
- LOAD DATA INPATH '$csvFolder'
- INTO TABLE $tableName
- OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
- """
- }
-
- def appendToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
- // find out table
- // find out streaming segment
- // for each rdd partition, find out the appendable carbon file
- // check whether it is full
- // if full, create new file
- // append to it: create blocklet header and data, call thrift to convert, write hdfs
- }
+package object spark {
+ implicit def toDataFrameFuncs(dataFrame: DataFrame): DataFrameFuncs = {
+ new DataFrameFuncs(dataFrame)
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
index 84b4c29..4dfcd3f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala
@@ -17,6 +17,8 @@
package org.apache.carbondata.spark.util
+import scala.reflect.ClassTag
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
@@ -26,6 +28,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
import org.apache.carbondata.hadoop.CarbonInputFormat
+
/**
* All the utility functions for carbon plan creation
*/
@@ -44,4 +47,13 @@ object QueryPlanUtil {
absoluteTableIdentifier.getCarbonTableIdentifier)
(carbonInputFormat, job)
}
+
+ def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier,
+ conf: Configuration) : CarbonInputFormat[V] = {
+ val carbonInputFormat = new CarbonInputFormat[V]()
+ val job: Job = new Job(conf)
+ FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getStorePath))
+ CarbonInputFormat.setTableToAccess(conf, absoluteTableIdentifier.getCarbonTableIdentifier)
+ carbonInputFormat
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index cf82072..7a8397c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -20,75 +20,89 @@ package org.apache.spark.sql
import java.text.SimpleDateFormat
import java.util.Date
-import scala.reflect.ClassTag
-
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier
+import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
+import org.apache.carbondata.scan.expression.logical.AndExpression
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
+import org.apache.carbondata.spark.util.QueryPlanUtil
+import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FileStatus
+import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{Job, JobID}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.{Job, JobID}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, DistributionUtil, TableMeta}
+import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
-import org.apache.carbondata.core.carbon.CarbonTableIdentifier
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
-import org.apache.carbondata.scan.expression.logical.AndExpression
-import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
-import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
-import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
+import scala.reflect.ClassTag
-private[sql] case class CarbonDatasourceHadoopRelation(sqlContext: SQLContext,
+private[sql] case class CarbonDatasourceHadoopRelation(
+ sqlContext: SQLContext,
paths: Array[String],
- parameters: Map[String, String])
- extends HadoopFsRelation {
-
- val (carbonRelation, jobConf) = {
- val options = new CarbonOption(parameters)
- val job: Job = new Job(new JobConf())
- FileInputFormat.setInputPaths(job, paths.head)
- val identifier = new CarbonTableIdentifier(options.dbName, options.tableName, options.tableId)
- CarbonInputFormat.setTableToAccess(job.getConfiguration, identifier)
- val table = CarbonInputFormat.getCarbonTable(job.getConfiguration)
- if(table == null) {
- sys.error(s"Store path ${paths.head} is not valid or " +
- s"table ${identifier.getTableUniqueName} does not exist in path.")
+ parameters: Map[String, String],
+ tableSchema: Option[StructType])
+ extends HadoopFsRelation {
+
+ lazy val schemaPath = new Path(CarbonTablePath.getSchemaFilePath(paths.head))
+ if (!schemaPath.getFileSystem(new Configuration).exists(schemaPath)) {
+ throw new IllegalArgumentException("invalid CarbonData file path: " + paths.head)
+ }
+
+ lazy val job = new Job(new JobConf())
+ lazy val options = new CarbonOption(parameters)
+ lazy val absIdentifier = AbsoluteTableIdentifier.fromTablePath(paths.head)
+ lazy val identifier = absIdentifier.getCarbonTableIdentifier
+ lazy val relationRaw: CarbonRelation = {
+ val carbonTable = SchemaReader.readCarbonTableFromStore(
+ CarbonStorePath.getCarbonTablePath(absIdentifier.getStorePath, identifier),
+ absIdentifier)
+ if (carbonTable == null) {
+ sys.error(s"CarbonData file path ${paths.head} is not valid")
}
- val relation = CarbonRelation(table.getDatabaseName,
- table.getFactTableName,
- CarbonSparkUtil.createSparkMeta(table),
+ CarbonRelation(
+ carbonTable.getDatabaseName,
+ carbonTable.getFactTableName,
+ CarbonSparkUtil.createSparkMeta(carbonTable),
TableMeta(identifier,
paths.head,
- table,
+ carbonTable,
Partitioner(options.partitionClass,
Array(""),
options.partitionCount.toInt,
- DistributionUtil.getNodeList(sqlContext.sparkContext))),
- None)(sqlContext)
-
- (relation, job.getConfiguration)
+ DistributionUtil.getNodeList(sqlContext.sparkContext)
+ )
+ ),
+ None
+ )(sqlContext)
}
- def dataSchema: StructType = carbonRelation.schema
+ override def dataSchema: StructType = tableSchema.getOrElse(relationRaw.schema)
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- // TODO: implement it
+ // TODO
throw new UnsupportedOperationException
}
- override def buildScan(requiredColumns: Array[String],
+ override def buildScan(
+ requiredColumns: Array[String],
filters: Array[Filter],
inputFiles: Array[FileStatus]): RDD[Row] = {
- val conf = new Configuration(jobConf)
- filters.flatMap(f => CarbonFilters.createCarbonFilter(dataSchema, f))
- .reduceOption(new AndExpression(_, _))
- .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+ val conf = new Configuration(job.getConfiguration)
+ filters.flatMap { filter =>
+ CarbonFilters.createCarbonFilter(dataSchema, filter)
+ }.reduceOption(new AndExpression(_, _))
+ .foreach(CarbonInputFormat.setFilterPredicates(conf, _))
+
val projection = new CarbonProjection
requiredColumns.foreach(projection.addColumn)
CarbonInputFormat.setColumnProjection(projection, conf)
@@ -96,14 +110,17 @@ private[sql] case class CarbonDatasourceHadoopRelation(sqlContext: SQLContext,
new CarbonHadoopFSRDD[Row](sqlContext.sparkContext,
new SerializableConfiguration(conf),
+ absIdentifier,
classOf[CarbonInputFormat[Row]],
- classOf[Row])
+ classOf[Row]
+ )
}
+
}
class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
val carbonSplit: SerializableWritable[CarbonInputSplit])
- extends Partition {
+ extends Partition {
override val index: Int = idx
@@ -113,40 +130,32 @@ class CarbonHadoopFSPartition(rddId: Int, val idx: Int,
class CarbonHadoopFSRDD[V: ClassTag](
@transient sc: SparkContext,
conf: SerializableConfiguration,
+ identifier: AbsoluteTableIdentifier,
inputFormatClass: Class[_ <: CarbonInputFormat[V]],
valueClass: Class[V])
- extends RDD[V](sc, Nil)
- with SparkHadoopMapReduceUtil
- with Logging {
+ extends RDD[V](sc, Nil)
+ with SparkHadoopMapReduceUtil
+ with Logging {
private val jobTrackerId: String = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
formatter.format(new Date())
}
-
@transient protected val jobId = new JobID(jobTrackerId, id)
- override protected def getPartitions: Array[Partition] = {
- val inputFormat = inputFormatClass.newInstance
- val jobContext = newJobContext(conf.value, jobId)
- val splits = inputFormat.getSplits(jobContext).toArray
- val carbonInputSplits = splits
- .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
- carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
- }
-
@DeveloperApi
override def compute(split: Partition,
context: TaskContext): Iterator[V] = {
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId)
- val inputFormat = inputFormatClass.newInstance
+ val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
+ hadoopAttemptContext.getConfiguration)
+ hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getStorePath)
val reader =
inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
hadoopAttemptContext)
- reader
- .initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
- hadoopAttemptContext)
+ reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value,
+ hadoopAttemptContext)
new Iterator[V] {
private[this] var havePair = false
private[this] var finished = false
@@ -174,4 +183,15 @@ class CarbonHadoopFSRDD[V: ClassTag](
}
}
}
+
+ override protected def getPartitions: Array[Partition] = {
+ val jobContext = newJobContext(conf.value, jobId)
+ val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier,
+ jobContext.getConfiguration)
+ jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getStorePath)
+ val splits = carbonInputFormat.getSplits(jobContext).toArray
+ val carbonInputSplits = splits
+ .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit]))
+ carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1))
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 6240c7c..f219ef5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -42,11 +42,8 @@ import org.apache.carbondata.spark.{CarbonOption, _}
* Carbon relation provider compliant to data source api.
* Creates carbon relations
*/
-class CarbonSource
- extends RelationProvider
- with CreatableRelationProvider
- with HadoopFsRelationProvider
- with DataSourceRegister {
+class CarbonSource extends RelationProvider
+ with CreatableRelationProvider with HadoopFsRelationProvider with DataSourceRegister {
override def shortName(): String = "carbondata"
@@ -61,7 +58,7 @@ class CarbonSource
// if path is provided we can directly create Hadoop relation. \
// Otherwise create datasource relation
parameters.get("path") match {
- case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters)
+ case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters, None)
case _ =>
val options = new CarbonOption(parameters)
val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
@@ -92,7 +89,7 @@ class CarbonSource
val storePath = CarbonContext.getInstance(sqlContext.sparkContext).storePath
val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
- .exists(tablePath)
+ .exists(tablePath)
val (doSave, doAppend) = (mode, isExists) match {
case (SaveMode.ErrorIfExists, true) =>
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
@@ -123,24 +120,24 @@ class CarbonSource
dataSchema: Option[StructType],
partitionColumns: Option[StructType],
parameters: Map[String, String]): HadoopFsRelation = {
- CarbonDatasourceHadoopRelation(sqlContext, paths, parameters)
+ CarbonDatasourceHadoopRelation(sqlContext, paths, parameters, dataSchema)
}
}
/**
- * Creates carbon relation compliant to data source api.
+ * Creates carbon relation compliant to data source api.
* This relation is stored to hive metastore
*/
private[sql] case class CarbonDatasourceRelation(
tableIdentifier: TableIdentifier,
alias: Option[String])
- (@transient context: SQLContext)
- extends BaseRelation with Serializable with Logging {
+ (@transient context: SQLContext)
+ extends BaseRelation with Serializable with Logging {
def carbonRelation: CarbonRelation = {
CarbonEnv.getInstance(context)
- .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
- .asInstanceOf[CarbonRelation]
+ .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
+ .asInstanceOf[CarbonRelation]
}
def schema: StructType = carbonRelation.schema
@@ -159,16 +156,16 @@ case class CarbonRelation(
metaData: CarbonMetaData,
tableMeta: TableMeta,
alias: Option[String])(@transient sqlContext: SQLContext)
- extends LeafNode with MultiInstanceRelation {
+ extends LeafNode with MultiInstanceRelation {
def recursiveMethod(dimName: String, childDim: CarbonDimension): String = {
childDim.getDataType.toString.toLowerCase match {
case "array" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:array<${ getArrayChildren(childDim.getColName) }>"
+ childDim.getColName.substring(dimName.length + 1)
+ }:array<${ getArrayChildren(childDim.getColName) }>"
case "struct" => s"${
- childDim.getColName.substring(dimName.length + 1)
- }:struct<${ getStructChildren(childDim.getColName) }>"
+ childDim.getColName.substring(dimName.length + 1)
+ }:struct<${ getStructChildren(childDim.getColName) }>"
case dType => s"${ childDim.getColName.substring(dimName.length + 1) }:${ dType }"
}
}
@@ -195,31 +192,31 @@ case class CarbonRelation(
.asScala.map(f => s"${ recursiveMethod(childDim.getColName, f) }").mkString(",")
}>"
case dType => s"${ childDim.getColName
- .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
+ .substring(dimName.length() + 1) }:${ addDecimalScaleAndPrecision(childDim, dType) }"
}
}).mkString(",")
}
override def newInstance(): LogicalPlan = {
CarbonRelation(databaseName, tableName, metaData, tableMeta, alias)(sqlContext)
- .asInstanceOf[this.type]
+ .asInstanceOf[this.type]
}
val dimensionsAttr = {
val sett = new LinkedHashSet(
tableMeta.carbonTable.getDimensionByTableName(tableMeta.carbonTableIdentifier.getTableName)
- .asScala.asJava)
+ .asScala.asJava)
sett.asScala.toSeq.filter(!_.getColumnSchema.isInvisible).map(dim => {
val dimval = metaData.carbonTable
- .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
+ .getDimensionByName(metaData.carbonTable.getFactTableName, dim.getColName)
val output: DataType = dimval.getDataType
- .toString.toLowerCase match {
- case "array" => CarbonMetastoreTypes
- .toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
- case "struct" => CarbonMetastoreTypes
- .toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
+ .toString.toLowerCase match {
+ case "array" =>
+ CarbonMetastoreTypes.toDataType(s"array<${ getArrayChildren(dim.getColName) }>")
+ case "struct" =>
+ CarbonMetastoreTypes.toDataType(s"struct<${ getStructChildren(dim.getColName) }>")
case dType =>
- var dataType = addDecimalScaleAndPrecision(dimval, dType)
+ val dataType = addDecimalScaleAndPrecision(dimval, dType)
CarbonMetastoreTypes.toDataType(dataType)
}
@@ -234,17 +231,17 @@ case class CarbonRelation(
val factTable = tableMeta.carbonTable.getFactTableName
new LinkedHashSet(
tableMeta.carbonTable.
- getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
- asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
+ getMeasureByTableName(tableMeta.carbonTable.getFactTableName).
+ asScala.asJava).asScala.toSeq.filter(!_.getColumnSchema.isInvisible)
.map(x => AttributeReference(x.getColName, CarbonMetastoreTypes.toDataType(
- metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
- .toLowerCase match {
- case "int" => "long"
- case "short" => "long"
- case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
- case others => others
- }),
- nullable = true)(qualifiers = tableName +: alias.toSeq))
+ metaData.carbonTable.getMeasureByName(factTable, x.getColName).getDataType.toString
+ .toLowerCase match {
+ case "int" => "long"
+ case "short" => "long"
+ case "decimal" => "decimal(" + x.getPrecision + "," + x.getScale + ")"
+ case others => others
+ }),
+ nullable = true)(qualifiers = tableName +: alias.toSeq))
}
override val output = dimensionsAttr ++ measureAttr
@@ -265,8 +262,7 @@ case class CarbonRelation(
if (dimval.getDataType
== org.apache.carbondata.core.carbon.metadata.datatype.DataType.DECIMAL) {
dType +=
- "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema
- .getScale + ")"
+ "(" + dimval.getColumnSchema.getPrecision + "," + dimval.getColumnSchema.getScale + ")"
}
dType
}
@@ -277,8 +273,8 @@ case class CarbonRelation(
def sizeInBytes: Long = {
val tableStatusNewLastUpdatedTime = new SegmentStatusManager(
- tableMeta.carbonTable.getAbsoluteTableIdentifier)
- .getTableStatusLastModifiedTime
+ tableMeta.carbonTable.getAbsoluteTableIdentifier)
+ .getTableStatusLastModifiedTime
if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) {
val tablePath = CarbonStorePath.getCarbonTablePath(
tableMeta.storePath,
@@ -294,5 +290,3 @@ case class CarbonRelation(
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
index c2e85d1..c105cae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala
@@ -148,9 +148,9 @@ case class CarbonScan(
buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
val tableCreationTime = carbonCatalog
- .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
+ .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
val schemaLastUpdatedTime = carbonCatalog
- .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
+ .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
val big = new CarbonScanRDD(
ocRaw.sparkContext,
model,
@@ -189,5 +189,3 @@ case class CarbonScan(
}
}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
index 73aa2b0..ebcaf75 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
@@ -21,9 +21,9 @@ package org.apache.carbondata.integration.spark.testsuite.dataload
import java.io.File
-import org.apache.spark.sql.{Row, DataFrame, SaveMode}
import org.apache.spark.sql.common.util.CarbonHiveContext._
import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
import org.scalatest.BeforeAndAfterAll
class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
@@ -40,23 +40,32 @@ class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
df = sc.parallelize(1 to 1000)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
- }
- test("read and write using CarbonContext") {
// save dataframe to carbon file
df.write
.format("carbondata")
.option("tableName", "carbon1")
.mode(SaveMode.Overwrite)
.save()
+ }
+ test("read and write using CarbonContext") {
val in = read
.format("carbondata")
.option("tableName", "carbon1")
.load()
assert(in.where("c3 > 500").count() == 500)
- sql("DROP TABLE IF EXISTS carbon1")
+ }
+
+ test("read and write using CarbonContext with compression") {
+ val in = read
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .option("compress", "true")
+ .load()
+
+ assert(in.where("c3 > 500").count() == 500)
}
test("saveAsCarbon API") {
@@ -64,18 +73,75 @@ class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
- sql("DROP TABLE IF EXISTS carbon2")
}
- test("saveAsCarbon API using compression") {
- import org.apache.carbondata.spark._
- df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true"))
+ test("query using SQLContext") {
+ val sqlContext = new SQLContext(sparkContext)
+ sqlContext.sql(
+ s"""
+ | CREATE TEMPORARY TABLE temp
+ | (c1 string, c2 string, c3 long)
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS (path '$storePath/default/carbon1')
+ """.stripMargin)
+ checkAnswer(sqlContext.sql(
+ """
+ | SELECT c1, c2, count(*)
+ | FROM temp
+ | WHERE c3 > 100
+ | GROUP BY c1, c2
+ """.stripMargin), Seq(Row("a", "b", 900)))
+ sqlContext.dropTempTable("temp")
+ }
- checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
- sql("DROP TABLE IF EXISTS carbon2")
+ test("query using SQLContext without providing schema") {
+ val sqlContext = new SQLContext(sparkContext)
+ sqlContext.sql(
+ s"""
+ | CREATE TEMPORARY TABLE temp
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS (path '$storePath/default/carbon1')
+ """.stripMargin)
+ checkAnswer(sqlContext.sql(
+ """
+ | SELECT c1, c2, count(*)
+ | FROM temp
+ | WHERE c3 > 100
+ | GROUP BY c1, c2
+ """.stripMargin), Seq(Row("a", "b", 900)))
+ sqlContext.dropTempTable("temp")
}
- override def afterAll {
+ test("query using SQLContext, multiple load") {
+ sql("DROP TABLE IF EXISTS test")
+ sql(
+ """
+ | CREATE TABLE test(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ val testData = currentDirectory + "/src/test/resources/sample.csv"
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table test")
+
+ val sqlContext = new SQLContext(sparkContext)
+ sqlContext.sql(
+ s"""
+ | CREATE TEMPORARY TABLE temp
+ | (id long, name string, city string, age long)
+ | USING org.apache.spark.sql.CarbonSource
+ | OPTIONS (path '$storePath/default/test')
+ """.stripMargin)
+ checkAnswer(sqlContext.sql(
+ """
+ | SELECT count(id)
+ | FROM temp
+ """.stripMargin), Seq(Row(8)))
+ sqlContext.dropTempTable("temp")
+ sql("DROP TABLE test")
+ }
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS carbon1")
+ sql("DROP TABLE IF EXISTS carbon2")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/528c2bef/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
index e28ddc0..0838a2b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/hadooprelation/HadoopFSRelationTestCase.scala
@@ -49,13 +49,13 @@ class HadoopFSRelationTestCase extends QueryTest with BeforeAndAfterAll {
test("hadoopfsrelation select all test") {
val rdd = read.format("org.apache.spark.sql.CarbonSource")
- .option("tableName", "hadoopfsrelation").load("./target/test")
+ .option("tableName", "hadoopfsrelation").load()
assert(rdd.collect().length > 0)
}
test("hadoopfsrelation filters test") {
val rdd: DataFrame = read.format("org.apache.spark.sql.CarbonSource")
- .option("tableName", "hadoopfsrelation").load("./target/test")
+ .option("tableName", "hadoopfsrelation").load()
.select("empno", "empname", "utilization").where("empname in ('arvind','ayushi')")
checkAnswer(
rdd,
[2/2] incubator-carbondata git commit: [CARBONDATA-212] Use
SQLContext to read CarbonData files This closes #126
Posted by ch...@apache.org.
[CARBONDATA-212] Use SQLContext to read CarbonData files This closes #126
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/57ea8c5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/57ea8c5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/57ea8c5e
Branch: refs/heads/master
Commit: 57ea8c5e880668d029ad89df2e9e4153ae141506
Parents: e0d1bad 528c2be
Author: chenliang613 <ch...@apache.org>
Authored: Thu Sep 15 00:25:52 2016 +0800
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 15 00:25:52 2016 +0800
----------------------------------------------------------------------
.../core/carbon/AbsoluteTableIdentifier.java | 15 ++
.../core/carbon/path/CarbonTablePath.java | 10 +-
.../carbondata/examples/DatasourceExample.scala | 54 ++++++++
.../examples/util/InitForExamples.scala | 4 +-
.../carbondata/hadoop/CarbonInputFormat.java | 9 +-
.../carbondata/hadoop/util/SchemaReader.java | 25 ++--
.../spark/implicit/DataFrameFuncs.scala | 122 ++++++++++++++++
.../org/apache/carbondata/spark/package.scala | 113 +--------------
.../carbondata/spark/util/QueryPlanUtil.scala | 12 ++
.../sql/CarbonDatasourceHadoopRelation.scala | 138 +++++++++++--------
.../spark/sql/CarbonDatasourceRelation.scala | 82 +++++------
.../org/apache/spark/sql/CarbonOperators.scala | 6 +-
.../dataload/SparkDatasourceSuite.scala | 88 ++++++++++--
.../HadoopFSRelationTestCase.scala | 4 +-
14 files changed, 437 insertions(+), 245 deletions(-)
----------------------------------------------------------------------