You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/03/26 11:06:43 UTC

[carbondata] branch master updated: [CARBONDATA-3319][TestCase]Added condition to check if datamap exist or not before caching

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 176d6a7  [CARBONDATA-3319][TestCase]Added condition to check if datamap exist or not before caching
176d6a7 is described below

commit 176d6a7017d1e1f74121334c09f8c4d0a1ee4b28
Author: Aryan-Khaitan <ar...@gmail.com>
AuthorDate: Tue Mar 19 14:42:39 2019 +0530

    [CARBONDATA-3319][TestCase]Added condition to check if datamap
    exist or not before caching
    
    1. Syntax Error, bracket was missing in QueriesBVATestCase.scala
    2. In TestCreateTableUsingSparkCarbonFileFormat.scala,
    clearDataMapCache was checking whether datamap exist or not and
    then deleted it. Thus a new class has been added for ReplaceRule
    as there was no ReplaceRule applied earlier for SDV.
    3. The test case of parition by date is an old use of
    partition and thus has been ignored in test cases.
    4. The complex delimeters used has been corrected to "\001" instead of "$".
    
    This closes #3153
---
 .../cluster/sdv/generated/QueriesBVATestCase.scala |   4 +-
 ...teTableUsingSparkCarbonFileFormatTestCase.scala |   7 +-
 .../datasource/SparkCarbonDataSourceTestCase.scala |  19 +--
 .../spark/sql/common/util/DataSourceTestUtil.scala | 144 +++++++++++++++++++++
 .../TestAllDataTypeForPartitionTable.scala         |   4 +-
 5 files changed, 162 insertions(+), 16 deletions(-)

diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBVATestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBVATestCase.scala
index 130fe08..11c705d 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBVATestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/QueriesBVATestCase.scala
@@ -10697,8 +10697,8 @@ class QueriesBVATestCase extends QueryTest with BeforeAndAfterAll {
   //PushUP_FILTER_test_boundary_TC194
   test("PushUP_FILTER_test_boundary_TC194", Include) {
 
-    checkAnswer(s"""select min(c2_Bigint),max(c2_Bigint),sum(c2_Bigint),avg(c2_Bigint) , count(c2_Bigint), variance(c2_Bigint) from (select c2_Bigint from Test_Boundary where sin(c1_int)=0.18796200317975467 or sin(c1_int)=-0.18796200317975467 order by c2_Bigint""",
-      s"""select min(c2_Bigint),max(c2_Bigint),sum(c2_Bigint),avg(c2_Bigint) , count(c2_Bigint), variance(c2_Bigint) from (select c2_Bigint from Test_Boundary_hive where sin(c1_int)=0.18796200317975467 or sin(c1_int)=-0.18796200317975467 order by c2_Bigint""", "QueriesBVATestCase_PushUP_FILTER_test_boundary_TC194")
+    checkAnswer(s"""select min(c2_Bigint),max(c2_Bigint),sum(c2_Bigint),avg(c2_Bigint) , count(c2_Bigint), variance(c2_Bigint) from (select c2_Bigint from Test_Boundary where sin(c1_int)=0.18796200317975467 or sin(c1_int)=-0.18796200317975467 order by c2_Bigint)""",
+      s"""select min(c2_Bigint),max(c2_Bigint),sum(c2_Bigint),avg(c2_Bigint) , count(c2_Bigint), variance(c2_Bigint) from (select c2_Bigint from Test_Boundary_hive where sin(c1_int)=0.18796200317975467 or sin(c1_int)=-0.18796200317975467 order by c2_Bigint)""", "QueriesBVATestCase_PushUP_FILTER_test_boundary_TC194")
 
   }
 
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
index b96fe10..ecf9ff4 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/CreateTableUsingSparkCarbonFileFormatTestCase.scala
@@ -23,7 +23,8 @@ import java.util.{Date, Random}
 
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang.RandomStringUtils
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+import org.apache.spark.sql.common.util.DataSourceTestUtil._
 import org.apache.spark.util.SparkUtil
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -37,8 +38,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 
-class CreateTableUsingSparkCarbonFileFormatTestCase extends QueryTest with BeforeAndAfterAll {
-
+class CreateTableUsingSparkCarbonFileFormatTestCase extends FunSuite with BeforeAndAfterAll {
+  import spark._
   override def beforeAll(): Unit = {
     sql("DROP TABLE IF EXISTS sdkOutputTable")
   }
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
index 8f41ba7..abdced7 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/datasource/SparkCarbonDataSourceTestCase.scala
@@ -25,10 +25,10 @@ import org.apache.avro.file.DataFileWriter
 import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
 import org.apache.avro.io.{DecoderFactory, Encoder}
 import org.apache.spark.sql.{AnalysisException, Row}
-import org.apache.spark.sql.common.util.QueryTest
+import org.apache.spark.sql.common.util.DataSourceTestUtil._
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.junit.Assert
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfterAll,FunSuite}
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -37,7 +37,8 @@ import org.apache.carbondata.core.metadata.datatype.{DataTypes, StructField}
 import org.apache.carbondata.hadoop.testutil.StoreCreator
 import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 
-class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
+class SparkCarbonDataSourceTestCase extends FunSuite with BeforeAndAfterAll {
+  import spark._
 
   val warehouse1 = s"${TestQueryExecutor.projectPath}/integration/spark-datasource/target/warehouse"
 
@@ -616,7 +617,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
       "double, HQ_DEPOSIT double) row format delimited fields terminated by ',' collection items " +
       "terminated by '$'")
     val sourceFile = FileFactory
-      .getPath(s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/Array.csv")
+      .getPath(s"$resource" + "../../../../../spark-datasource/src/test/resources/Array.csv")
       .toString
     sql(s"load data local inpath '$sourceFile' into table array_com_hive")
     sql(
@@ -643,7 +644,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
       "terminated by '$' map keys terminated by '&'")
     val sourceFile = FileFactory
       .getPath(
-        s"$resourcesPath" + "../../../../../spark-datasource/src/test/resources/structofarray.csv")
+        s"$resource" + "../../../../../spark-datasource/src/test/resources/structofarray.csv")
       .toString
     sql(s"load data local inpath '$sourceFile' into table STRUCT_OF_ARRAY_com_hive")
     sql(
@@ -890,7 +891,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
 
       var i = 0
       while (i < 11) {
-        val array = Array[String](s"name$i", s"$i" + "$" + s"$i.${ i }12")
+        val array = Array[String](s"name$i", s"$i" + "\001" + s"$i.${ i }12")
         writer.write(array)
         i += 1
       }
@@ -992,8 +993,8 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
       var i = 0
       while (i < 10) {
         val array = Array[String](s"name$i",
-          s"$i" + "$" + s"${ i * 2 }",
-          s"${ i / 2 }" + "$" + s"${ i / 3 }")
+          s"$i" + "\001" + s"${ i * 2 }",
+          s"${ i / 2 }" + "\001" + s"${ i / 3 }")
         writer.write(array)
         i += 1
       }
@@ -1273,7 +1274,7 @@ class SparkCarbonDataSourceTestCase extends QueryTest with BeforeAndAfterAll {
       " Timestamp,deliveryDate timestamp,deliverycharge double)row format delimited FIELDS " +
       "terminated by ',' LINES terminated by '\n' stored as textfile")
     val sourceFile = FileFactory
-      .getPath(s"$resourcesPath" +
+      .getPath(s"$resource" +
                "../../../../../spark-datasource/src/test/resources/vardhandaterestruct.csv")
       .toString
     sql(s"load data local inpath '$sourceFile' into table fileformat_drop_hive")
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/DataSourceTestUtil.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/DataSourceTestUtil.scala
new file mode 100644
index 0000000..8a5b154
--- /dev/null
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/spark/sql/common/util/DataSourceTestUtil.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.common.util
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+
+import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.util.sideBySide
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
+
+
+object DataSourceTestUtil {
+
+  val rootPath = new File(this.getClass.getResource("/").getPath
+                          + "../../../..").getCanonicalPath
+  val warehouse1 = FileFactory.getPath(s"$rootPath/integration/spark-datasource/target/warehouse")
+    .toString
+  val resource = s"$rootPath/integration/spark-datasource/src/test/resources"
+  val metaStoreDB1 = s"$rootPath/integration/spark-datasource/target"
+  val spark = SparkSession
+    .builder()
+    .enableHiveSupport()
+    .master("local")
+    .config("spark.sql.warehouse.dir", warehouse1)
+    .config("spark.driver.host", "localhost")
+    .config("spark.sql.crossJoin.enabled", "true")
+    .config("spark.sql.hive.caseSensitiveInferenceMode", "INFER_AND_SAVE")
+    .getOrCreate()
+  spark.sparkContext.setLogLevel("ERROR")
+  if (!spark.sparkContext.version.startsWith("2.1")) {
+    spark.experimental.extraOptimizations = Seq(new CarbonFileIndexReplaceRule)
+  }
+  CarbonProperties.getInstance()
+    .addProperty(CarbonCommonConstants.CARBON_MINMAX_ALLOWED_BYTE_COUNT, "40")
+
+  def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): Unit = {
+    checkAnswer(df, expectedAnswer.asScala)
+  }
+
+  def checkExistence(df: DataFrame, exists: Boolean, keywords: String*) {
+    val outputs = df.collect().map(_.mkString).mkString
+    for (key <- keywords) {
+      if (exists) {
+        assert(outputs.contains(key), s"Failed for $df ($key doesn't exist in result)")
+      } else {
+        assert(!outputs.contains(key), s"Failed for $df ($key existed in the result)")
+      }
+    }
+  }
+
+  def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
+    checkAnswer(df, expectedAnswer.collect())
+  }
+
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   * If there was exception during the execution or the contents of the DataFrame does not
+   * match the expected result, an error message will be returned. Otherwise, a [[None]] will
+   * be returned.
+   *
+   * @param df             the [[DataFrame]] to be executed
+   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+   */
+  def checkAnswer(df: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+    val isSorted = df.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
+
+    def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
+      // Converts data to types that we can do equality comparison using Scala collections.
+      // For BigDecimal type, the Scala type has a better definition of equality test (similar to
+      // Java's java.math.BigDecimal.compareTo).
+      // For binary arrays, we convert it to Seq to avoid of calling java.util.Arrays.equals for
+      // equality test.
+      val converted: Seq[Row] = answer.map { s =>
+        Row.fromSeq(s.toSeq.map {
+          case d: java.math.BigDecimal => BigDecimal(d)
+          case b: Array[Byte] => b.toSeq
+          case d: Double =>
+            if (!d.isInfinite && !d.isNaN) {
+              var bd = BigDecimal(d)
+              bd = bd.setScale(5, BigDecimal.RoundingMode.UP)
+              bd.doubleValue()
+            }
+            else {
+              d
+            }
+          case o => o
+        })
+      }
+      if (!isSorted) converted.sortBy(_.toString()) else converted
+    }
+
+    val sparkAnswer = try df.collect().toSeq catch {
+      case e: Exception =>
+        val errorMessage =
+          s"""
+             |Exception thrown while executing query:
+             |${ df.queryExecution }
+             |== Exception ==
+             |$e
+             |${ org.apache.spark.sql.catalyst.util.stackTraceToString(e) }
+          """.stripMargin
+        return Some(errorMessage)
+    }
+
+    if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
+      val errorMessage =
+        s"""
+           |Results do not match for query:
+           |${ df.queryExecution }
+           |== Results ==
+           |${
+          sideBySide(
+            s"== Correct Answer - ${ expectedAnswer.size } ==" +:
+            prepareAnswer(expectedAnswer).map(_.toString()),
+            s"== Spark Answer - ${ sparkAnswer.size } ==" +:
+            prepareAnswer(sparkAnswer).map(_.toString())).mkString("\n")
+        }
+      """.stripMargin
+      assert(false, errorMessage)
+    }
+  }
+}
\ No newline at end of file
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
index 54586c2..82c6d48 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAllDataTypeForPartitionTable.scala
@@ -328,7 +328,7 @@ class TestAllDataTypeForPartitionTable extends QueryTest with BeforeAndAfterAll
       Seq(Row(32767, 2147483647, 9223372036854775807L, 2147483648.1, 9223372036854775808.1, BigDecimal("9223372036854775808.1234"), Timestamp.valueOf("2017-06-13 23:59:59"), Date.valueOf("2017-06-13"), "abc3", "abcd3", "abcde3", new mutable.WrappedArray.ofRef[String](Array("a", "b", "c", "3")), Row("a", "b", "3"))))
   }
 
-  test("allTypeTable_hash_date") {
+  ignore("allTypeTable_hash_date") {
     val tableName = "allTypeTable_hash_date"
 
     sql(
@@ -1096,7 +1096,7 @@ class TestAllDataTypeForPartitionTable extends QueryTest with BeforeAndAfterAll
       Seq(Row(32767, 2147483647, 9223372036854775807L, 2147483648.1, 9223372036854775808.1, BigDecimal("9223372036854775808.1234"), Timestamp.valueOf("2017-06-13 23:59:59"), Date.valueOf("2017-06-13"), "abc3", "abcd3", "abcde3", new mutable.WrappedArray.ofRef[String](Array("a", "b", "c", "3")), Row("a", "b", "3"))))
   }
 
-  test("allTypeTable_range_date") {
+  ignore("allTypeTable_range_date") {
     val tableName = "allTypeTable_range_date"
 
     sql(