You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/02/01 07:12:37 UTC

carbondata git commit: [CARBONDATA-2111] Fix the decoder issue when multiple joins are present in the TPCH query

Repository: carbondata
Updated Branches:
  refs/heads/master 7ed144c53 -> c9a02fc2a


[CARBONDATA-2111] Fix the decoder issue when multiple joins are present in the TPCH query

Problem
The TPCh query which has multiple joins fails to return any rows

Analysis
It is because of decode of dictionary columns not happening for some joins in case of self join and same column is used for join multiple times.

Solution
If project list attributes are not present as part of the decoder to be decoded attributes then add them to notDecodeCarryForward list, otherwise
there is a chance of skipping decoding of those columns in case of join case.If left and right plans both use same attribute but from the left
side it is not decoded and right side it is decoded then we should decide based on the above project list plan.

This closes #1895


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/c9a02fc2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/c9a02fc2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/c9a02fc2

Branch: refs/heads/master
Commit: c9a02fc2a8389288085fae4ba5d7375d11de22ff
Parents: 7ed144c
Author: ravipesala <ra...@gmail.com>
Authored: Wed Jan 31 18:39:05 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Thu Feb 1 12:43:48 2018 +0530

----------------------------------------------------------------------
 .../allqueries/AllDataTypesTestCase.scala       | 47 +++++++++++++++++++-
 .../CarbonDecoderOptimizerHelper.scala          |  4 ++
 .../sql/optimizer/CarbonLateDecodeRule.scala    | 26 +++++++++++
 3 files changed, 76 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9a02fc2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase.scala
index e739091..afff2d0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCase.scala
@@ -17,8 +17,10 @@
 
 package org.apache.carbondata.spark.testsuite.allqueries
 
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.catalyst.plans.logical.Join
+import org.apache.spark.sql.{CarbonDictionaryCatalystDecoder, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
@@ -1154,4 +1156,47 @@ class AllDataTypesTestCase extends QueryTest with BeforeAndAfterAll {
 
   }
 
+  test("TPCH query issue with not joining with decoded values") {
+
+    sql("drop table if exists SUPPLIER")
+    sql("drop table if exists PARTSUPP")
+    sql("drop table if exists CUSTOMER")
+    sql("drop table if exists NATION")
+    sql("drop table if exists REGION")
+    sql("drop table if exists PART")
+    sql("drop table if exists LINEITEM")
+    sql("drop table if exists ORDERS")
+    sql("create table if not exists SUPPLIER(S_COMMENT string,S_SUPPKEY string,S_NAME string, S_ADDRESS string, S_NATIONKEY string, S_PHONE string, S_ACCTBAL double) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_EXCLUDE'='S_COMMENT, S_SUPPKEY, S_NAME, S_ADDRESS, S_NATIONKEY, S_PHONE','table_blocksize'='300','SORT_COLUMNS'='')")
+    sql("create table if not exists PARTSUPP (  PS_PARTKEY int,  PS_SUPPKEY  string,  PS_AVAILQTY  int,  PS_SUPPLYCOST  double,  PS_COMMENT  string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_EXCLUDE'='PS_SUPPKEY,PS_COMMENT', 'table_blocksize'='300', 'no_inverted_index'='PS_SUPPKEY, PS_COMMENT','SORT_COLUMNS'='')")
+    sql("create table if not exists CUSTOMER(  C_MKTSEGMENT string,  C_NATIONKEY string,  C_CUSTKEY string,  C_NAME string,  C_ADDRESS string,  C_PHONE string,  C_ACCTBAL double,  C_COMMENT string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_INCLUDE'='C_MKTSEGMENT,C_NATIONKEY','DICTIONARY_EXCLUDE'='C_CUSTKEY,C_NAME,C_ADDRESS,C_PHONE,C_COMMENT', 'table_blocksize'='300', 'no_inverted_index'='C_CUSTKEY,C_NAME,C_ADDRESS,C_PHONE,C_COMMENT','SORT_COLUMNS'='C_MKTSEGMENT')")
+    sql("create table if not exists NATION (  N_NAME string,  N_NATIONKEY string,  N_REGIONKEY string,  N_COMMENT  string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_INCLUDE'='N_REGIONKEY','DICTIONARY_EXCLUDE'='N_COMMENT', 'table_blocksize'='300','no_inverted_index'='N_COMMENT','SORT_COLUMNS'='N_NAME')")
+    sql("create table if not exists REGION(  R_NAME string,  R_REGIONKEY string,  R_COMMENT string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_INCLUDE'='R_NAME,R_REGIONKEY','DICTIONARY_EXCLUDE'='R_COMMENT', 'table_blocksize'='300','no_inverted_index'='R_COMMENT','SORT_COLUMNS'='R_NAME')")
+    sql("create table if not exists PART(  P_BRAND string,  P_SIZE int,  P_CONTAINER string,  P_TYPE string,  P_PARTKEY INT ,  P_NAME string,  P_MFGR string,  P_RETAILPRICE double,  P_COMMENT string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_INCLUDE'='P_BRAND,P_SIZE,P_CONTAINER,P_MFGR','DICTIONARY_EXCLUDE'='P_NAME, P_COMMENT', 'table_blocksize'='300','no_inverted_index'='P_NAME,P_COMMENT,P_MFGR','SORT_COLUMNS'='P_SIZE,P_TYPE,P_NAME,P_BRAND,P_CONTAINER')")
+    sql("create table if not exists LINEITEM(  L_SHIPDATE date,  L_SHIPMODE string,  L_SHIPINSTRUCT string,  L_RETURNFLAG string,  L_RECEIPTDATE date,  L_ORDERKEY INT ,  L_PARTKEY INT ,  L_SUPPKEY   string,  L_LINENUMBER int,  L_QUANTITY double,  L_EXTENDEDPRICE double,  L_DISCOUNT double,  L_TAX double,  L_LINESTATUS string,  L_COMMITDATE date,  L_COMMENT  string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_INCLUDE'='L_SHIPDATE,L_SHIPMODE,L_SHIPINSTRUCT,L_RECEIPTDATE,L_COMMITDATE,L_RETURNFLAG,L_LINESTATUS','DICTIONARY_EXCLUDE'='L_SUPPKEY, L_COMMENT', 'table_blocksize'='300', 'no_inverted_index'='L_SUPPKEY,L_COMMENT','SORT_COLUMNS'='L_SHIPDATE,L_RETURNFLAG,L_SHIPMODE,L_RECEIPTDATE,L_SHIPINSTRUCT')")
+    sql("create table if not exists ORDERS(  O_ORDERDATE date,  O_ORDERPRIORITY string,  O_ORDERSTATUS string,  O_ORDERKEY int,  O_CUSTKEY string,  O_TOTALPRICE double,  O_CLERK string,  O_SHIPPRIORITY int,  O_COMMENT string) STORED BY 'org.apache.carbondata.format'TBLPROPERTIES ('DICTIONARY_INCLUDE'='O_ORDERDATE,O_ORDERSTATUS','DICTIONARY_EXCLUDE'='O_ORDERPRIORITY, O_CUSTKEY, O_CLERK, O_COMMENT', 'table_blocksize'='300','no_inverted_index'='O_ORDERPRIORITY, O_CUSTKEY, O_CLERK, O_COMMENT', 'SORT_COLUMNS'='O_ORDERDATE')")
+    val df = sql(
+      "select s_acctbal, s_name, n_name, p_partkey, p_mfgr, s_address, s_phone, s_comment from " +
+      "part, supplier, partsupp, nation, region where p_partkey = ps_partkey and s_suppkey = " +
+      "ps_suppkey and p_size = 15 and p_type like '%BRASS' and s_nationkey = n_nationkey and " +
+      "n_regionkey = r_regionkey and r_name = 'EUROPE' and ps_supplycost = ( select min" +
+      "(ps_supplycost) from partsupp, supplier,nation, region where p_partkey = ps_partkey and " +
+      "s_suppkey = ps_suppkey and s_nationkey = n_nationkey and n_regionkey = r_regionkey and " +
+      "r_name = 'EUROPE' ) order by s_acctbal desc, n_name, s_name, p_partkey limit 100")
+
+    val decoders = df.queryExecution.optimizedPlan.collect {
+      case p: CarbonDictionaryCatalystDecoder => p
+    }
+
+    assertResult(5)(decoders.length)
+
+    sql("drop table if exists SUPPLIER")
+    sql("drop table if exists PARTSUPP")
+    sql("drop table if exists CUSTOMER")
+    sql("drop table if exists NATION")
+    sql("drop table if exists REGION")
+    sql("drop table if exists PART")
+    sql("drop table if exists LINEITEM")
+    sql("drop table if exists ORDERS")
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9a02fc2/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
index fee4b66..e055c86 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/optimizer/CarbonDecoderOptimizerHelper.scala
@@ -43,6 +43,9 @@ case class CarbonDictionaryTempDecoder(
     isOuter: Boolean = false,
     aliasMap: Option[CarbonAliasDecoderRelation] = None) extends UnaryNode {
   var processed = false
+  // In case of join plan and project does not include the notDecode attributes then we should not
+  // carry forward to above plan.
+  val notDecodeCarryForward = new util.HashSet[AttributeReferenceWrapper]()
 
   def getAttrsNotDecode: util.Set[Attribute] = {
     val set = new util.HashSet[Attribute]()
@@ -111,6 +114,7 @@ class CarbonDecoderProcessor {
       decoderNotDecode: util.HashSet[AttributeReferenceWrapper]): Unit = {
     scalaList.reverseMap {
       case Node(cd: CarbonDictionaryTempDecoder) =>
+        cd.notDecodeCarryForward.asScala.foreach(decoderNotDecode.remove)
         decoderNotDecode.asScala.foreach(cd.attrsNotDecode.add)
         decoderNotDecode.asScala.foreach(cd.attrList.remove)
         decoderNotDecode.addAll(cd.attrList)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/c9a02fc2/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 764891b..06ad0ad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -522,6 +522,30 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
           }
       }
 
+
+    transFormedPlan transform {
+      // If project list attributes are not present as part of decoder to be decoded attributes
+      // then add them to notDecodeCarryForward list, otherwise there is a chance of skipping
+      // decoding of those columns in case of join case.
+      // If left and right plans both uses same attribute but from left side it is
+      // not decoded and right side it is decoded then we should decide based on the above project
+      // list plan.
+      case project@Project(projectList, child: Join) =>
+        val allAttr = new util.HashSet[AttributeReferenceWrapper]()
+        val allDecoder = child.collect {
+          case cd : CarbonDictionaryTempDecoder =>
+            allAttr.addAll(cd.attrList)
+            cd
+        }
+        if (allDecoder.nonEmpty && !allAttr.isEmpty) {
+          val notForward = allAttr.asScala.filterNot {attrWrapper =>
+            val attr = attrWrapper.attr
+            projectList.exists(f => attr.name.equalsIgnoreCase(f.name) && attr.exprId == f.exprId)
+          }
+          allDecoder.head.notDecodeCarryForward.addAll(notForward.asJava)
+        }
+        project
+    }
     val processor = new CarbonDecoderProcessor
     processor.updateDecoders(processor.getDecoderList(transFormedPlan))
     updateProjection(updateTempDecoder(transFormedPlan, aliasMap, attrMap))
@@ -825,5 +849,7 @@ case class CarbonDecoderRelation(
   }
 
   lazy val dictionaryMap = carbonRelation.carbonRelation.metaData.dictionaryMap
+
+  override def toString: String = carbonRelation.carbonTable.getTableUniqueName
 }