You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/09/06 14:42:05 UTC
[1/2] incubator-carbondata git commit: compress CSV file using GZIP
while loading
Repository: incubator-carbondata
Updated Branches:
refs/heads/master c7999c14e -> 005186223
compress CSV file using GZIP while loading
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/952ba386
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/952ba386
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/952ba386
Branch: refs/heads/master
Commit: 952ba38699367c7f336f4ed07ed606ad38d14e2c
Parents: c7999c1
Author: jackylk <ja...@huawei.com>
Authored: Tue Sep 6 20:07:23 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Sep 6 20:07:23 2016 +0530
----------------------------------------------------------------------
.../datastorage/store/impl/FileFactory.java | 41 +++---
.../examples/DataFrameAPIExample.scala | 11 +-
.../apache/carbondata/examples/PerfTest.scala | 1 -
.../examples/util/InitForExamples.scala | 1 -
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../carbondata/spark/csv/CarbonTextFile.scala | 2 +
.../carbondata/spark/csv/DefaultSource.scala | 17 ++-
.../org/apache/carbondata/spark/package.scala | 40 +++---
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +
.../spark/util/GlobalDictionaryUtil.scala | 1 +
.../spark/sql/CarbonDatasourceRelation.scala | 2 +-
.../scala/org/apache/spark/util/FileUtils.scala | 4 +-
.../spark/src/test/resources/sample.csv.gz | Bin 0 -> 106 bytes
.../MultiFilesDataLoagdingTestCase.scala | 2 +-
.../dataload/SparkDatasourceSuite.scala | 81 ++++++++++++
.../dataload/TestLoadDataGeneral.scala | 66 ++++++++++
.../csvreaderstep/BoundedDataStream.java | 126 +++++++++++++++++++
.../csvreaderstep/CustomDataStream.java | 126 -------------------
.../csvreaderstep/UnivocityCsvParser.java | 43 ++++---
19 files changed, 374 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
index d537d6e..854ec8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastorage/store/impl/FileFactory.java
@@ -27,6 +27,8 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
+import java.util.zip.GZIPInputStream;
import org.apache.carbondata.core.datastorage.store.FileHolder;
import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
@@ -41,6 +43,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.GzipCodec;
public final class FileFactory {
private static Configuration configuration = null;
@@ -117,36 +120,40 @@ public final class FileFactory {
public static DataInputStream getDataInputStream(String path, FileType fileType)
throws IOException {
- path = path.replace("\\", "/");
- switch (fileType) {
- case LOCAL:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
- case HDFS:
- case VIEWFS:
- Path pt = new Path(path);
- FileSystem fs = FileSystem.get(configuration);
- FSDataInputStream stream = fs.open(pt);
- return new DataInputStream(new BufferedInputStream(stream));
- default:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
- }
+ return getDataInputStream(path, fileType, -1);
}
public static DataInputStream getDataInputStream(String path, FileType fileType, int bufferSize)
throws IOException {
path = path.replace("\\", "/");
+ boolean gzip = path.endsWith(".gz");
+ InputStream stream;
switch (fileType) {
case LOCAL:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+ if (gzip) {
+ stream = new GZIPInputStream(new FileInputStream(path));
+ } else {
+ stream = new FileInputStream(path);
+ }
+ break;
case HDFS:
case VIEWFS:
Path pt = new Path(path);
FileSystem fs = FileSystem.get(configuration);
- FSDataInputStream stream = fs.open(pt, bufferSize);
- return new DataInputStream(new BufferedInputStream(stream));
+ if (bufferSize == -1) {
+ stream = fs.open(pt);
+ } else {
+ stream = fs.open(pt, bufferSize);
+ }
+ if (gzip) {
+ GzipCodec codec = new GzipCodec();
+ stream = codec.createInputStream(stream);
+ }
+ break;
default:
- return new DataInputStream(new BufferedInputStream(new FileInputStream(path)));
+ throw new UnsupportedOperationException("unsupported file system");
}
+ return new DataInputStream(new BufferedInputStream(stream));
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index 57e0f3c..d2ee959 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.SaveMode
import org.apache.carbondata.examples.util.InitForExamples
+// scalastyle:off println
object DataFrameAPIExample {
def main(args: Array[String]) {
@@ -28,7 +29,8 @@ object DataFrameAPIExample {
val sc = cc.sc
import cc.implicits._
- // create a dataframe
+
+ // create a dataframe, it can be from parquet or hive table
val df = sc.parallelize(1 to 1000)
.map(x => ("a", "b", x))
.toDF("c1", "c2", "c3")
@@ -47,10 +49,7 @@ object DataFrameAPIExample {
.load()
val count = in.where($"c3" > 500).select($"*").count()
-
- // scalastyle:off println
println(s"count using dataframe.read: $count")
- // scalastyle:on println
// use SQL to read
cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show
@@ -58,8 +57,10 @@ object DataFrameAPIExample {
// also support a implicit function for easier access
import org.apache.carbondata.spark._
- df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
+ df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true"))
+
cc.sql("SELECT count(*) FROM carbon2 WHERE c3 > 100").show
cc.sql("DROP TABLE IF EXISTS carbon2")
}
}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala b/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
index a18a1e9..cfc5814 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/PerfTest.scala
@@ -21,7 +21,6 @@ import java.io.File
import scala.util.Random
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{CarbonContext, DataFrame, Row, SaveMode, SQLContext}
import org.apache.spark.sql.types.{DataTypes, StructType}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
index 34d7736..21377d7 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/util/InitForExamples.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.examples.util
import java.io.File
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.CarbonContext
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index d4282b2..9115d14 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -36,4 +36,6 @@ class CarbonOption(options: Map[String, String]) {
"org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl")
}
+ def compress: String = options.getOrElse("compress", "false")
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
index 2968ae6..c703a81 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/CarbonTextFile.scala
@@ -36,6 +36,8 @@ private[csv] object CarbonTextFile {
val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
hadoopConfiguration.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true)
+ hadoopConfiguration.set("io.compression.codecs", "org.apache.hadoop.io.compress.GzipCodec")
+
CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration)
new NewHadoopRDD[LongWritable, Text](
sc,
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
index 33455b7..cd76651 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/csv/DefaultSource.scala
@@ -16,11 +16,11 @@
*/
package com.databricks.spark.csv.newapi
-import com.databricks.spark.csv.CarbonCsvRelation
-import com.databricks.spark.csv.CsvSchemaRDD
-import com.databricks.spark.csv.util.{ ParserLibs, TextFile, TypeCast }
+import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
+import com.databricks.spark.csv.util.{ParserLibs, TextFile, TypeCast}
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{ DataFrame, SaveMode, SQLContext }
+import org.apache.hadoop.io.compress.{CompressionCodec, GzipCodec}
+import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
@@ -164,9 +164,16 @@ class DefaultSource
} else {
true
}
+
+ val codec: Class[_ <: CompressionCodec] =
+ parameters.getOrElse("codec", "none") match {
+ case "gzip" => classOf[GzipCodec]
+ case _ => null
+ }
+
if (doSave) {
// Only save data when the save mode is not ignore.
- data.saveAsCsvFile(path, parameters)
+ data.saveAsCsvFile(path, parameters, codec)
}
createRelation(sqlContext, parameters, data.schema)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
index 973ec96..b46af53 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/package.scala
@@ -18,12 +18,13 @@
package org.apache.carbondata
import org.apache.hadoop.fs.Path
+import org.apache.spark.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
-package object spark {
+package object spark extends Logging {
implicit class toCarbonDataFrame(dataFrame: DataFrame) {
@@ -43,38 +44,44 @@ package object spark {
// temporary solution: write to csv file, then load the csv into carbon
val tempCSVFolder = s"$storePath/$dbName/$tableName/tempCSV"
- dataFrame.write
- .format(csvPackage)
- .option("header", "true")
- .mode(SaveMode.Overwrite)
- .save(tempCSVFolder)
+ var writer: DataFrameWriter =
+ dataFrame.write
+ .format(csvPackage)
+ .option("header", "false")
+ .mode(SaveMode.Overwrite)
+
+ if (options.compress.equals("true")) {
+ writer = writer.option("codec", "gzip")
+ }
+
+ writer.save(tempCSVFolder)
val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
val tempCSVPath = new Path(tempCSVFolder)
val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
- try {
- cc.sql(makeCreateTableString(dataFrame.schema, options))
-
- // add 'csv' as file extension to all generated part file
+ def countSize(): Double = {
+ var size: Double = 0
val itor = fs.listFiles(tempCSVPath, true)
while (itor.hasNext) {
val f = itor.next()
if (f.getPath.getName.startsWith("part-")) {
- val newPath = s"${ f.getPath.getParent }/${ f.getPath.getName }.csv"
- if (!fs.rename(f.getPath, new Path(newPath))) {
- cc.sql(s"DROP TABLE ${ options.tableName }")
- throw new RuntimeException("File system rename failed when loading data into carbon")
- }
+ size += f.getLen
}
}
+ size
+ }
+
+ try {
+ cc.sql(makeCreateTableString(dataFrame.schema, options))
+ logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
cc.sql(makeLoadString(tableName, tempCSVFolder))
} finally {
fs.delete(tempCSVPath, true)
}
}
- private def csvPackage: String = "com.databricks.spark.csv"
+ private def csvPackage: String = "com.databricks.spark.csv.newapi"
private def convertToCarbonType(sparkType: DataType): String = {
sparkType match {
@@ -107,6 +114,7 @@ package object spark {
s"""
LOAD DATA INPATH '$csvFolder'
INTO TABLE $tableName
+ OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
"""
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8f4bd06..ef2920f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -887,6 +887,8 @@ object CarbonDataRDDFactory extends Logging {
val filePaths = carbonLoadModel.getFactFilePath
hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+ hadoopConfiguration.set("io.compression.codecs",
+ "org.apache.hadoop.io.compress.GzipCodec")
configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index d68fa86..3580810 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -364,6 +364,7 @@ object GlobalDictionaryUtil extends Logging {
.option("escape", carbonLoadModel.getEscapeChar)
.option("ignoreLeadingWhiteSpace", "false")
.option("ignoreTrailingWhiteSpace", "false")
+ .option("codec", "gzip")
.load(carbonLoadModel.getFactFilePath)
df
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 3183695..6240c7c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -98,7 +98,7 @@ class CarbonSource
sys.error(s"ErrorIfExists mode, path $storePath already exists.")
case (SaveMode.Overwrite, true) =>
val cc = CarbonContext.getInstance(sqlContext.sparkContext)
- cc.sql(s"DROP CUBE IF EXISTS ${ options.dbName }.${ options.tableName }")
+ cc.sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
(true, false)
case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
(true, false)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
index 558bb1c..b683629 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/FileUtils.scala
@@ -43,10 +43,8 @@ object FileUtils extends Logging {
} else if (fileName.startsWith(CarbonCommonConstants.UNDERSCORE) ||
fileName.startsWith(CarbonCommonConstants.POINT)) {
logWarning(s"skip invisible input file: $path")
- } else if (fileName.toLowerCase().endsWith(".csv")) {
- stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
} else {
- logWarning(s"skip input file: $path, because this path doesn't end with '.csv'")
+ stringBuild.append(path.replace('\\', '/')).append(CarbonCommonConstants.COMMA)
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/resources/sample.csv.gz
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/resources/sample.csv.gz b/integration/spark/src/test/resources/sample.csv.gz
new file mode 100644
index 0000000..80513b8
Binary files /dev/null and b/integration/spark/src/test/resources/sample.csv.gz differ
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
index 827aa31..d44c73d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/MultiFilesDataLoagdingTestCase.scala
@@ -49,7 +49,7 @@ class MultiFilesDataLoagdingTestCase extends QueryTest with BeforeAndAfterAll {
sql(s"LOAD DATA LOCAL INPATH '$testData' into table multifile")
checkAnswer(
sql("select count(empno) from multifile"),
- Seq(Row(8))
+ Seq(Row(10))
)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
new file mode 100644
index 0000000..73aa2b0
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/SparkDatasourceSuite.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.File
+
+import org.apache.spark.sql.{Row, DataFrame, SaveMode}
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class SparkDatasourceSuite extends QueryTest with BeforeAndAfterAll {
+
+ var currentDirectory: String = _
+ var df: DataFrame = _
+
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS carbon1")
+
+ currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+ .getCanonicalPath
+ import implicits._
+ df = sc.parallelize(1 to 1000)
+ .map(x => ("a", "b", x))
+ .toDF("c1", "c2", "c3")
+ }
+
+ test("read and write using CarbonContext") {
+ // save dataframe to carbon file
+ df.write
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ val in = read
+ .format("carbondata")
+ .option("tableName", "carbon1")
+ .load()
+
+ assert(in.where("c3 > 500").count() == 500)
+ sql("DROP TABLE IF EXISTS carbon1")
+ }
+
+ test("saveAsCarbon API") {
+ import org.apache.carbondata.spark._
+ df.saveAsCarbonFile(Map("tableName" -> "carbon2"))
+
+ checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
+ sql("DROP TABLE IF EXISTS carbon2")
+ }
+
+ test("saveAsCarbon API using compression") {
+ import org.apache.carbondata.spark._
+ df.saveAsCarbonFile(Map("tableName" -> "carbon2", "compress" -> "true"))
+
+ checkAnswer(sql("SELECT count(*) FROM carbon2 WHERE c3 > 100"), Seq(Row(900)))
+ sql("DROP TABLE IF EXISTS carbon2")
+ }
+
+ override def afterAll {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
new file mode 100644
index 0000000..9280447
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
+
+ var currentDirectory: String = _
+
+ override def beforeAll {
+ sql("DROP TABLE IF EXISTS loadtest")
+ sql(
+ """
+ | CREATE TABLE loadtest(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+ .getCanonicalPath
+ }
+
+ test("test data loading CSV file") {
+ val testData = currentDirectory + "/src/test/resources/sample.csv"
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM loadtest"),
+ Seq(Row(4))
+ )
+ }
+
+ test("test data loading GZIP compressed CSV file") {
+ val testData = currentDirectory + "/src/test/resources/sample.csv.gz"
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table loadtest")
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM loadtest"),
+ Seq(Row(8))
+ )
+ }
+
+ override def afterAll {
+ sql("DROP TABLE loadtest")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
new file mode 100644
index 0000000..a9406df
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/BoundedDataStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.csvreaderstep;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Custom reader class to read the data from file it will take care of reading
+ * till the limit assigned to this class
+ */
+public class BoundedDataStream extends InputStream {
+
+ /**
+ * byte value of the new line character
+ */
+ private static final byte END_OF_LINE_BYTE_VALUE = '\n';
+
+ /**
+ * number of extra character to read
+ */
+ private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
+
+ /**
+ * number of bytes remaining
+ */
+ private long remaining;
+ /**
+ * to check whether end of line is found
+ */
+ private boolean endOfLineFound = false;
+
+ private DataInputStream in;
+
+ public BoundedDataStream(DataInputStream in, long limit) {
+ this.in = in;
+ this.remaining = limit;
+ }
+
+ /**
+ * Below method will be used to read the data from file
+ *
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read() throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ int var1 = this.in.read();
+ if (var1 >= 0) {
+ --this.remaining;
+ }
+
+ return var1;
+ }
+ }
+
+ /**
+ * Below method will be used to read the data from file. If limit reaches in
+ * that case it will read until new line character is reached
+ *
+ * @param buffer
+ * buffer in which data will be read
+ * @param offset
+ * from position to buffer will be filled
+ * @param length
+ * number of character to be read
+ * @throws IOException
+ * problem while reading
+ */
+ @Override
+ public int read(byte[] buffer, int offset, int length) throws IOException {
+ if (this.remaining == 0) {
+ return -1;
+ } else {
+ if (this.remaining < length) {
+ length = (int) this.remaining;
+ }
+
+ length = this.in.read(buffer, offset, length);
+ if (length >= 0) {
+ this.remaining -= length;
+ if (this.remaining == 0 && !endOfLineFound) {
+ endOfLineFound = true;
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ } else if (endOfLineFound) {
+ int end = offset + length;
+ for (int i = offset; i < end; i++) {
+ if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
+ this.remaining = 0;
+ return (i - offset) + 1;
+ }
+ }
+ this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
+ }
+ }
+ return length;
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
deleted file mode 100644
index 0023dda..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/CustomDataStream.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.processing.csvreaderstep;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Custom reader class to read the data from file it will take care of reading
- * till the limit assigned to this class
- */
-public class CustomDataStream extends InputStream {
-
- /**
- * byte value of the new line character
- */
- private static final byte END_OF_LINE_BYTE_VALUE = '\n';
-
- /**
- * number of extra character to read
- */
- private static final int NUMBER_OF_EXTRA_CHARACTER_TO_READ = 100;
-
- /**
- * number of bytes remaining
- */
- private long remaining;
- /**
- * to check whether end of line is found
- */
- private boolean endOfLineFound = false;
-
- private DataInputStream in;
-
- public CustomDataStream(DataInputStream in, long limit) {
- this.in = in;
- this.remaining = limit;
- }
-
- /**
- * Below method will be used to read the data from file
- *
- * @throws IOException
- * problem while reading
- */
- @Override
- public int read() throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- int var1 = this.in.read();
- if (var1 >= 0) {
- --this.remaining;
- }
-
- return var1;
- }
- }
-
- /**
- * Below method will be used to read the data from file. If limit reaches in
- * that case it will read until new line character is reached
- *
- * @param buffer
- * buffer in which data will be read
- * @param offset
- * from position to buffer will be filled
- * @param length
- * number of character to be read
- * @throws IOException
- * problem while reading
- */
- @Override
- public int read(byte[] buffer, int offset, int length) throws IOException {
- if (this.remaining == 0) {
- return -1;
- } else {
- if (this.remaining < length) {
- length = (int) this.remaining;
- }
-
- length = this.in.read(buffer, offset, length);
- if (length >= 0) {
- this.remaining -= length;
- if (this.remaining == 0 && !endOfLineFound) {
- endOfLineFound = true;
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- } else if (endOfLineFound) {
- int end = offset + length;
- for (int i = offset; i < end; i++) {
- if (buffer[i] == END_OF_LINE_BYTE_VALUE) {
- this.remaining = 0;
- return (i - offset) + 1;
- }
- }
- this.remaining += NUMBER_OF_EXTRA_CHARACTER_TO_READ;
- }
- }
- return length;
- }
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/952ba386/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
index 630586a..49f17dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvreaderstep/UnivocityCsvParser.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
@@ -40,6 +42,8 @@ import org.apache.hadoop.util.LineReader;
*/
public class UnivocityCsvParser {
+ private LogService LOGGER = LogServiceFactory.getLogService(this.getClass().getName());
+
/**
* Max number of columns that will be parsed for a row by univocity parsing
*/
@@ -137,25 +141,28 @@ public class UnivocityCsvParser {
// if already one input stream is open first we need to close and then
// open new stream
close();
- // get the block offset
- long startOffset = this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockOffset();
- FileType fileType = FileFactory
- .getFileType(this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath());
- // calculate the end offset the block
- long endOffset =
- this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockLength() + startOffset;
-
- // create a input stream for the block
- DataInputStream dataInputStream = FileFactory
- .getDataInputStream(this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath(),
- fileType, bufferSize, startOffset);
- // if start offset is not 0 then reading then reading and ignoring the extra line
- if (startOffset != 0) {
- LineReader lineReader = new LineReader(dataInputStream, 1);
- startOffset += lineReader.readLine(new Text(), 0);
+
+ String path = this.csvParserVo.getBlockDetailsList().get(blockCounter).getFilePath();
+ FileType fileType = FileFactory.getFileType(path);
+
+ if (path.endsWith(".gz")) {
+ DataInputStream dataInputStream = FileFactory.getDataInputStream(path, fileType, bufferSize);
+ inputStreamReader = new BufferedReader(new InputStreamReader(dataInputStream));
+ } else {
+ long startOffset = this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockOffset();
+ long blockLength = this.csvParserVo.getBlockDetailsList().get(blockCounter).getBlockLength();
+ long endOffset = blockLength + startOffset;
+
+ DataInputStream dataInputStream =
+ FileFactory.getDataInputStream(path, fileType, bufferSize, startOffset);
+ // if start offset is not 0 then reading then reading and ignoring the extra line
+ if (startOffset != 0) {
+ LineReader lineReader = new LineReader(dataInputStream, 1);
+ startOffset += lineReader.readLine(new Text(), 0);
+ }
+ inputStreamReader = new BufferedReader(new InputStreamReader(
+ new BoundedDataStream(dataInputStream, endOffset - startOffset)));
}
- inputStreamReader = new BufferedReader(new InputStreamReader(
- new CustomDataStream(dataInputStream, endOffset - startOffset)));
}
/**
[2/2] incubator-carbondata git commit: [CARBONDATA-188] Compress CSV
file before loading This closes #104
Posted by ra...@apache.org.
[CARBONDATA-188] Compress CSV file before loading This closes #104
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/00518622
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/00518622
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/00518622
Branch: refs/heads/master
Commit: 005186223228aa18fef9475db1f2cd82bbff8a82
Parents: c7999c1 952ba38
Author: ravipesala <ra...@gmail.com>
Authored: Tue Sep 6 20:10:41 2016 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Sep 6 20:10:41 2016 +0530
----------------------------------------------------------------------
.../datastorage/store/impl/FileFactory.java | 41 +++---
.../examples/DataFrameAPIExample.scala | 11 +-
.../apache/carbondata/examples/PerfTest.scala | 1 -
.../examples/util/InitForExamples.scala | 1 -
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../carbondata/spark/csv/CarbonTextFile.scala | 2 +
.../carbondata/spark/csv/DefaultSource.scala | 17 ++-
.../org/apache/carbondata/spark/package.scala | 40 +++---
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +
.../spark/util/GlobalDictionaryUtil.scala | 1 +
.../spark/sql/CarbonDatasourceRelation.scala | 2 +-
.../scala/org/apache/spark/util/FileUtils.scala | 4 +-
.../spark/src/test/resources/sample.csv.gz | Bin 0 -> 106 bytes
.../MultiFilesDataLoagdingTestCase.scala | 2 +-
.../dataload/SparkDatasourceSuite.scala | 81 ++++++++++++
.../dataload/TestLoadDataGeneral.scala | 66 ++++++++++
.../csvreaderstep/BoundedDataStream.java | 126 +++++++++++++++++++
.../csvreaderstep/CustomDataStream.java | 126 -------------------
.../csvreaderstep/UnivocityCsvParser.java | 43 ++++---
19 files changed, 374 insertions(+), 194 deletions(-)
----------------------------------------------------------------------