You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/12/09 07:40:18 UTC
carbondata git commit: [CARBONDATA-3154] Fix spark-2.1 test error
Repository: carbondata
Updated Branches:
refs/heads/master 382ce430a -> d9f1a8115
[CARBONDATA-3154] Fix spark-2.1 test error
Spark2.2.1 supports location, but Spark2.1.0 doesn't support location,
supports options(path 'your file path').
So we should change location to options(path ... ) and
create new directory before use "create table" in spark2.1.0.
This closes #2981
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d9f1a811
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d9f1a811
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d9f1a811
Branch: refs/heads/master
Commit: d9f1a81154a0b917d16bde8ffba7cf013cc3cf2f
Parents: 382ce43
Author: xubo245 <xu...@huawei.com>
Authored: Sat Dec 8 00:01:43 2018 +0800
Committer: kunal642 <ku...@gmail.com>
Committed: Sun Dec 9 13:09:02 2018 +0530
----------------------------------------------------------------------
.../datasource/SparkCarbonDataSourceTest.scala | 112 +++++++++++++++----
1 file changed, 91 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9f1a811/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index c5d6a8c..470e0bf 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.carbondata.datasource
import java.io.File
import java.util
-import java.util.Arrays
import scala.collection.JavaConverters._
import scala.collection.mutable
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.util.SparkUtil
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -998,9 +999,19 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
i += 1
}
writer.close()
- spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
- "byte, floatfield: float>) " +
- s"using carbon location '$path'")
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (!FileFactory.isFileExist(path)) {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+ "byte, floatfield: float>) " +
+ s"using carbon options(path '$path')")
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+ "byte, floatfield: float>) " +
+ s"using carbon location '$path'")
+ }
} catch {
case ex: Exception => throw new RuntimeException(ex)
case _ => None
@@ -1052,13 +1063,33 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
writer.close()
spark.sql("drop table if exists sorted_par")
spark.sql("drop table if exists sort_table")
- spark.sql(s"create table sort_table (age int, height double, name string, address string," +
- s" salary long, bytefield byte) using carbon location '$path'")
- FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
- spark.sql(s"create table sorted_par(age int, height double, name string, address " +
- s"string," +
- s"salary long, bytefield byte) using parquet location " +
- s"'$warehouse1/../warehouse2'")
+ val path2 = s"$warehouse1/../warehouse2";
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (!FileFactory.isFileExist(path)) {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql(s"create table sort_table (age int, height double, name string, address string," +
+ s" salary long, bytefield byte) using carbon options(path '$path')")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+ if (!FileFactory.isFileExist(path2)) {
+ FileFactory.createDirectoryAndSetPermission(path2,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql(s"create table sorted_par(age int, height double, name string, address " +
+ s"string," +
+ s"salary long, bytefield byte) using parquet options(path " +
+ s"'$path2')")
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ spark.sql(s"create table sort_table (age int, height double, name string, address string," +
+ s" salary long, bytefield byte) using carbon location '$path'")
+ FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+ spark.sql(s"create table sorted_par(age int, height double, name string, address " +
+ s"string," +
+ s"salary long, bytefield byte) using parquet location " +
+ s"'$warehouse1/../warehouse2'")
+ }
+
(0 to 10).foreach {
i =>
spark.sql(s"insert into sorted_par select '$i', ${ i.toDouble / 2 }, 'name$i', " +
@@ -1098,10 +1129,21 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
i += 1
}
writer.close()
- spark.sql(s"create table complextable (stringfield string, bytearray " +
- s"array<byte>, floatarray array<float>) using carbon " +
- s"location " +
- s"'$path'")
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (!FileFactory.isFileExist(path)) {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql(s"create table complextable (stringfield string, bytearray " +
+ s"array<byte>, floatarray array<float>) using carbon " +
+ s"options( path " +
+ s"'$path')")
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ spark.sql(s"create table complextable (stringfield string, bytearray " +
+ s"array<byte>, floatarray array<float>) using carbon " +
+ s"location " +
+ s"'$path'")
+ }
} catch {
case ex: Exception => throw new RuntimeException(ex)
case _ => None
@@ -1123,9 +1165,20 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
private def createParquetTable {
val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2")
FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path"))
- spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
- s"string," +
- s"salary long, floatField float, bytefield byte) using parquet location '$path'")
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (!FileFactory.isFileExist(path)) {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using parquet options(path '$path')")
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+ s"string," +
+ s"salary long, floatField float, bytefield byte) using parquet location '$path'")
+ }
+
(0 to 10).foreach {
i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " +
s"'address$i', ${i*100}, $i.$i, '$i'")
@@ -1237,7 +1290,15 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
val rowCount = 3
buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount)
spark.sql("drop table if exists carbon_external")
- spark.sql(s"create table carbon_external using carbon location '$writerPath'")
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (!FileFactory.isFileExist(writerPath)) {
+ FileFactory.createDirectoryAndSetPermission(writerPath,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql(s"create table carbon_external using carbon options(path '$writerPath')")
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ spark.sql(s"create table carbon_external using carbon location '$writerPath'")
+ }
assert(spark.sql("select * from carbon_external").count() == rowCount)
spark.sql("drop table if exists carbon_external")
}
@@ -1268,8 +1329,17 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
i += 1
}
writer.close()
- spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
- s"'$path'")
+ if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+ if (!FileFactory.isFileExist(path)) {
+ FileFactory.createDirectoryAndSetPermission(path,
+ new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+ }
+ spark.sql(s"create table multi_page (a string, b float, c byte) using carbon options(path " +
+ s"'$path')")
+ } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+ spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
+ s"'$path'")
+ }
assert(spark.sql("select * from multi_page").count() == 33000)
} catch {
case ex: Exception => throw new RuntimeException(ex)