You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/26 05:36:38 UTC

[carbondata] branch master updated: [CARBONDATA-3448] Fix wrong results in preaggregate query with spark adaptive execution

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b2ef53  [CARBONDATA-3448] Fix wrong results in preaggregate query with spark adaptive execution
9b2ef53 is described below

commit 9b2ef53aef9a823f8007b7d3b042f634e7d874ca
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Fri Jun 21 10:35:06 2019 +0530

    [CARBONDATA-3448] Fix wrong results in preaggregate query with spark adaptive execution
    
    problem: Wrong results in preaggregate query with spark adaptive execution
    
    Spark2TestQueryExecutor.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
    
    cause: For preaggreagate, segment info is set into threadLocal. when adaptive execution is called, spark is calling getInternalPartition in
    another thread where updated segment conf is not set. Hence it is not using the updated segments.
    
    solution: CarbonScanRdd is already having the sessionInfo, use it instead of taking session info from the current thread.
    
    This closes #3303
---
 .../preaggregate/TestPreAggregateLoad.scala        | 29 ++++++++++++++++++++++
 .../carbondata/spark/rdd/CarbonScanRDD.scala       | 16 +++++++++---
 2 files changed, 41 insertions(+), 4 deletions(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 7ba8300..75d71ec 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -18,6 +18,8 @@
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.Spark2TestQueryExecutor
 import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -298,6 +300,33 @@ class TestPreAggregateLoad extends SparkQueryTest with BeforeAndAfterAll with Be
     checkAnswer(sql("select * from maintable_preagg_sum"), Row(1, 52, "xyz"))
   }
 
+  test("test pregarregate with spark adaptive execution ") {
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.3")) {
+      // enable adaptive execution
+      Spark2TestQueryExecutor.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
+    }
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id, sum(age) from maintable group by id,name"""
+        .stripMargin)
+    sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 20)")
+    sql(s"insert into maintable values(1, 'xyz', 'bengaluru', 30)")
+
+    checkAnswer(sql("select id, sum(age) from maintable group by id, name"), Row(1, 50))
+    sql("drop datamap preagg_sum on table maintable")
+    sql("drop table maintable")
+    if (Spark2TestQueryExecutor.spark.version.startsWith("2.3")) {
+      // disable adaptive execution
+      Spark2TestQueryExecutor.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")
+    }
+  }
+
+
 test("check load and select for avg double datatype") {
   sql("drop table if exists maintbl ")
   sql("create table maintbl(year int,month int,name string,salary double) stored by 'carbondata' tblproperties('sort_scope'='Global_sort','table_blocksize'='23','sort_columns'='month,year,name')")
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index b62a7e2..f90d279 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -654,7 +654,6 @@ class CarbonScanRDD[T: ClassTag](
     CarbonInputFormat.setColumnProjection(conf, columnProjection)
     CarbonInputFormatUtil.setDataMapJobIfConfigured(conf)
     // when validate segments is disabled in thread local update it to CarbonTableInputFormat
-    val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
       val tableUniqueKey = identifier.getDatabaseName + "." + identifier.getTableName
       val validateInputSegmentsKey = CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
@@ -675,9 +674,18 @@ class CarbonScanRDD[T: ClassTag](
               CarbonProperties.getInstance().getProperty(inputSegmentsKey, "*")))
       if (queryOnPreAggStreaming) {
         CarbonInputFormat.setAccessStreamingSegments(conf, queryOnPreAggStreaming)
-        carbonSessionInfo.getThreadParams.removeProperty(queryOnPreAggStreamingKey)
-        carbonSessionInfo.getThreadParams.removeProperty(inputSegmentsKey)
-        carbonSessionInfo.getThreadParams.removeProperty(validateInputSegmentsKey)
+        // union for streaming preaggregate can happen concurrently from spark.
+        // Need to clean both maintable and aggregate table segments
+        var keyList = scala.collection.immutable.List[String]()
+        carbonSessionInfo.getThreadParams.getAll.asScala.foreach {
+          case (key, value) =>
+            if (key.contains(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS) ||
+                key.contains(CarbonCommonConstantsInternal.QUERY_ON_PRE_AGG_STREAMING) ||
+                key.contains(CarbonCommonConstants.CARBON_INPUT_SEGMENTS)) {
+              keyList ::= key
+            }
+        }
+        keyList.foreach(key => carbonSessionInfo.getThreadParams.removeProperty(key))
       }
     }
     format