You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/10/08 12:32:40 UTC

[carbondata] branch master updated: [CARBONDATA-4215] Fix query issue after add segment other formats with vector read disabled

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b3d78b  [CARBONDATA-4215] Fix query issue after add segment other formats with vector read disabled
8b3d78b is described below

commit 8b3d78b9114aa74b7f9fd5e4f097dd5d279df18a
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Tue Sep 28 19:18:11 2021 +0530

    [CARBONDATA-4215] Fix query issue after add segment other formats with vector read disabled
    
    Why is this PR needed?
    If carbon.enable.vector.reader is disabled and parquet/orc segments are added
    to carbon table. Then on query, it fails with java.lang.ClassCastException:
    org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to
    org.apache.spark.sql.catalyst.InternalRow. When vector reader property is
    disabled, while scanning ColumnarBatchScan supportBatch would be overridden
    to false but external file format like ParuetFileFormat supportBatch is not
    overriden and it takes default as true.
    
    What changes were proposed in this PR?
    Made changes to override supportBatch of external file formats based on
    carbon.enable.vector.reader property.
    
    This closes #4226
---
 .../execution/strategy/CarbonSourceStrategy.scala  |  3 ++-
 .../execution/strategy/MixedFormatHandler.scala    | 18 +++++++++++---
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 28 ++++++++++++++++++++++
 pom.xml                                            |  1 +
 4 files changed, 46 insertions(+), 4 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
index ed0da08..009d8c7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonSourceStrategy.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.strategy.CarbonPlanHelper.vectorReaderEnabled
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
 
@@ -201,7 +202,7 @@ private[sql] object CarbonSourceStrategy extends SparkStrategy {
       new TableStatusReadCommittedScope(table.identifier, FileFactory.getConfiguration)
     val extraSegments = MixedFormatHandler.extraSegments(table.identifier, readCommittedScope)
     val extraRDD = MixedFormatHandler.extraRDD(relation, rawProjects, filterPredicates,
-      readCommittedScope, table.identifier, extraSegments)
+      readCommittedScope, table.identifier, extraSegments, vectorReaderEnabled())
     val vectorPushRowFilters =
       vectorPushRowFiltersEnabled(relationPredicates, extraSegments.nonEmpty)
     var directScanSupport = !vectorPushRowFilters
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
index 93f1aba..66b7215 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/MixedFormatHandler.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.carbondata.execution.datasources.SparkCarbonFileForm
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, NamedExpression}
-import org.apache.spark.sql.execution.{CodegenSupport, DataSourceScanExec, FilterExec, ProjectExec, SparkPlan}
+import org.apache.spark.sql.execution.{DataSourceScanExec, FilterExec, ProjectExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, InMemoryFileIndex, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
@@ -217,9 +217,9 @@ object MixedFormatHandler {
 
   def getFileFormat(fileFormat: FileFormatName, supportBatch: Boolean = true): FileFormat = {
     if (fileFormat.equals(new FileFormatName("parquet"))) {
-      new ParquetFileFormat
+      new ExtendedParquetFileFormat(supportBatch)
     } else if (fileFormat.equals(new FileFormatName("orc"))) {
-      new OrcFileFormat
+      new ExtendedOrcFileFormat(supportBatch)
     } else if (fileFormat.equals(new FileFormatName("json"))) {
       new JsonFileFormat
     } else if (fileFormat.equals(new FileFormatName("csv"))) {
@@ -231,6 +231,18 @@ object MixedFormatHandler {
     }
   }
 
+  private class ExtendedParquetFileFormat(supportBatch: Boolean) extends ParquetFileFormat {
+    override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+      super.supportBatch(sparkSession, schema) && supportBatch
+    }
+  }
+
+  private class ExtendedOrcFileFormat(supportBatch: Boolean) extends OrcFileFormat {
+    override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
+      super.supportBatch(sparkSession, schema) && supportBatch
+    }
+  }
+
   /**
    * Generates the RDD using the spark file format.
    */
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 5d2134f..0fd798b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -446,6 +446,34 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
+  test("Test add segment with different formats and vector reader disabled") {
+    sqlContext.setConf("carbon.enable.vector.reader", "false")
+    createCarbonTable()
+    createParquetTable()
+    createOrcTable()
+
+    val newPath1 = copyseg("addsegment2", "addsegtest1")
+    val newPath2 = copyseg("addsegment3", "addsegtest2")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+
+    sql("alter table addsegment1 add segment " +
+        s"options('path'='$newPath1', 'format'='parquet')").collect()
+    sql(s"alter table addsegment1 add segment options('path'='$newPath2', 'format'='orc')")
+      .collect()
+    assert(sql("select empname, designation, doj, workgroupcategory, " +
+               "workgroupcategoryname   from addsegment1").collect().length == 30)
+    checkAnswer(sql("select empname from addsegment1 where empname='arvind'"),
+      Seq(Row("arvind"), Row("arvind"), Row("arvind")))
+    checkAnswer(sql("select count(empname) from addsegment1"), Seq(Row(30)))
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(30)))
+    assert(sql("select deptname, deptno from addsegment1 where empname = 'arvind'")
+             .collect().length == 3)
+    FileFactory.deleteAllFilesOfDir(new File(newPath1))
+    FileFactory.deleteAllFilesOfDir(new File(newPath2))
+    sqlContext.setConf("carbon.enable.vector.reader",
+      CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
+  }
+
   test("Test update/delete blocking on mixed format segments") {
     createCarbonTable()
     createParquetTable()
diff --git a/pom.xml b/pom.xml
index 4ba43b0..da2fb41 100644
--- a/pom.xml
+++ b/pom.xml
@@ -469,6 +469,7 @@
           <findbugsXmlOutput>true</findbugsXmlOutput>
           <xmlOutput>true</xmlOutput>
           <effort>Max</effort>
+          <maxHeap>1024</maxHeap>
         </configuration>
         <executions>
           <execution>