You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/12/17 14:16:45 UTC

[03/21] carbondata git commit: [CARBONDATA-3154] Fix spark-2.1 test error

[CARBONDATA-3154] Fix spark-2.1 test error

Spark2.2.1 supports location, but Spark2.1.0 doesn't support location,
supports options(path 'your file path').
So we should change location to options(path ... ) and
create new directory before use "create table" in spark2.1.0.

This closes #2981


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3524f51d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3524f51d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3524f51d

Branch: refs/heads/branch-1.5
Commit: 3524f51df087059f1fa7ccd87f2082e4d2d36c20
Parents: 442e244
Author: xubo245 <xu...@huawei.com>
Authored: Sat Dec 8 00:01:43 2018 +0800
Committer: Raghunandan S <ca...@gmail.com>
Committed: Mon Dec 17 18:50:23 2018 +0530

----------------------------------------------------------------------
 .../datasource/SparkCarbonDataSourceTest.scala  | 112 +++++++++++++++----
 1 file changed, 91 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3524f51d/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index c5d6a8c..470e0bf 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -19,13 +19,14 @@ package org.apache.spark.sql.carbondata.datasource
 
 import java.io.File
 import java.util
-import java.util.Arrays
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
 import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
+import org.apache.spark.util.SparkUtil
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -998,9 +999,19 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
         i += 1
       }
       writer.close()
-      spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
-                "byte, floatfield: float>) " +
-                s"using carbon location '$path'")
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+        if (!FileFactory.isFileExist(path)) {
+          FileFactory.createDirectoryAndSetPermission(path,
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+        }
+        spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+          "byte, floatfield: float>) " +
+          s"using carbon options(path '$path')")
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        spark.sql("create table complextable (stringfield string, structfield struct<bytefield: " +
+          "byte, floatfield: float>) " +
+          s"using carbon location '$path'")
+      }
     } catch {
       case ex: Exception => throw new RuntimeException(ex)
       case _ => None
@@ -1052,13 +1063,33 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
       writer.close()
       spark.sql("drop table if exists sorted_par")
       spark.sql("drop table if exists sort_table")
-      spark.sql(s"create table sort_table (age int, height double, name string, address string," +
-                s" salary long, bytefield byte) using carbon location '$path'")
-      FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
-      spark.sql(s"create table sorted_par(age int, height double, name string, address " +
-                s"string," +
-                s"salary long, bytefield byte) using parquet location " +
-                s"'$warehouse1/../warehouse2'")
+      val path2 = s"$warehouse1/../warehouse2";
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+        if (!FileFactory.isFileExist(path)) {
+          FileFactory.createDirectoryAndSetPermission(path,
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+        }
+        spark.sql(s"create table sort_table (age int, height double, name string, address string," +
+          s" salary long, bytefield byte) using carbon  options(path '$path')")
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+        if (!FileFactory.isFileExist(path2)) {
+          FileFactory.createDirectoryAndSetPermission(path2,
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+        }
+        spark.sql(s"create table sorted_par(age int, height double, name string, address " +
+          s"string," +
+          s"salary long, bytefield byte) using parquet options(path " +
+          s"'$path2')")
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        spark.sql(s"create table sort_table (age int, height double, name string, address string," +
+          s" salary long, bytefield byte) using carbon location '$path'")
+        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$warehouse1/../warehouse2"))
+        spark.sql(s"create table sorted_par(age int, height double, name string, address " +
+          s"string," +
+          s"salary long, bytefield byte) using parquet location " +
+          s"'$warehouse1/../warehouse2'")
+      }
+
       (0 to 10).foreach {
         i =>
           spark.sql(s"insert into sorted_par select '$i', ${ i.toDouble / 2 }, 'name$i', " +
@@ -1098,10 +1129,21 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
         i += 1
       }
       writer.close()
-      spark.sql(s"create table complextable (stringfield string, bytearray " +
-                s"array<byte>, floatarray array<float>) using carbon " +
-                s"location " +
-                s"'$path'")
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+        if (!FileFactory.isFileExist(path)) {
+          FileFactory.createDirectoryAndSetPermission(path,
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+        }
+        spark.sql(s"create table complextable (stringfield string, bytearray " +
+          s"array<byte>, floatarray array<float>) using carbon " +
+          s"options( path " +
+          s"'$path')")
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        spark.sql(s"create table complextable (stringfield string, bytearray " +
+          s"array<byte>, floatarray array<float>) using carbon " +
+          s"location " +
+          s"'$path'")
+      }
     } catch {
       case ex: Exception => throw new RuntimeException(ex)
       case _ => None
@@ -1123,9 +1165,20 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
   private def createParquetTable {
     val path = FileFactory.getUpdatedFilePath(s"$warehouse1/../warehouse2")
     FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(s"$path"))
-    spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
-              s"string," +
-              s"salary long, floatField float, bytefield byte) using parquet location '$path'")
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      if (!FileFactory.isFileExist(path)) {
+        FileFactory.createDirectoryAndSetPermission(path,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+      }
+      spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using parquet options(path '$path')")
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      spark.sql(s"create table par_table(male boolean, age int, height double, name string, address " +
+        s"string," +
+        s"salary long, floatField float, bytefield byte) using parquet location '$path'")
+    }
+
     (0 to 10).foreach {
       i => spark.sql(s"insert into par_table select 'true','$i', ${i.toDouble / 2}, 'name$i', " +
                      s"'address$i', ${i*100}, $i.$i, '$i'")
@@ -1237,7 +1290,15 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
     val rowCount = 3
     buildStructSchemaWithNestedArrayOfMapTypeAsValue(writerPath, rowCount)
     spark.sql("drop table if exists carbon_external")
-    spark.sql(s"create table carbon_external using carbon location '$writerPath'")
+    if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+      if (!FileFactory.isFileExist(writerPath)) {
+        FileFactory.createDirectoryAndSetPermission(writerPath,
+          new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+      }
+      spark.sql(s"create table carbon_external using carbon options(path '$writerPath')")
+    } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+      spark.sql(s"create table carbon_external using carbon location '$writerPath'")
+    }
     assert(spark.sql("select * from carbon_external").count() == rowCount)
     spark.sql("drop table if exists carbon_external")
   }
@@ -1268,8 +1329,17 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
         i += 1
       }
       writer.close()
-      spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
-                s"'$path'")
+      if (SparkUtil.isSparkVersionEqualTo("2.1")) {
+        if (!FileFactory.isFileExist(path)) {
+          FileFactory.createDirectoryAndSetPermission(path,
+            new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL))
+        }
+        spark.sql(s"create table multi_page (a string, b float, c byte) using carbon options(path " +
+          s"'$path')")
+      } else if (SparkUtil.isSparkVersionXandAbove("2.2")) {
+        spark.sql(s"create table multi_page (a string, b float, c byte) using carbon location " +
+          s"'$path'")
+      }
       assert(spark.sql("select * from multi_page").count() == 33000)
     } catch {
       case ex: Exception => throw new RuntimeException(ex)