You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/05/15 17:46:24 UTC

[carbondata] branch master updated: [CARBONDATA-3815]Insert into table select from another table throws exception for spatial tables

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 544e1e2  [CARBONDATA-3815]Insert into table select from another table throws exception for spatial tables
544e1e2 is described below

commit 544e1e2f07be9427cef8136f1e76d58b481d5534
Author: Venu Reddy <k....@gmail.com>
AuthorDate: Mon May 11 13:29:14 2020 +0530

    [CARBONDATA-3815]Insert into table select from another table throws exception for spatial tables
    
    Why is this PR needed?
    Insert into table select from another table throws exception for spatial tables.
    NoSuchElementException exception is thrown with 'mygeohash' column.
    
    What changes were proposed in this PR?
    Excluded spatial columns during getReArrangedIndexAndSelectedSchema.
    And have set the carbonLoadModel.setIndexColumnsPresent if spatial columns
    are present during rearranging.
    If the target spatial table has sort_scope configured as global-sort,
    have made the insert flow to go through loadDataFrame() instead of
    insertDataUsingGlobalSortWithInternalRow() in CarbonDataRDDFactory.loadCarbonData().
    This ensures that it goes through existing load without Conversion Step and spatial
    column values are regenerated in the flow.
    
    This closes #3760
---
 .../carbondata/hadoop/api/CarbonInputFormat.java   |  2 +-
 .../org/apache/carbondata/spark/util/Util.java     |  2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           |  6 +-
 .../datasources/SparkCarbonFileFormat.scala        |  2 +-
 .../management/CarbonInsertIntoCommand.scala       | 31 +++++---
 .../command/management/CommonLoadUtils.scala       |  4 +-
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 92 +++++++++++++++++++---
 .../loading/parser/impl/JsonRowParser.java         |  2 +-
 .../InputProcessorStepWithNoConverterImpl.java     | 15 ++--
 .../carbondata/sdk/file/CarbonSchemaReader.java    |  2 +-
 10 files changed, 118 insertions(+), 40 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 868b52b..5c9e5e1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -858,7 +858,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     List<String> projectColumns = new ArrayList<>();
     // complex type and add just the parent column name while skipping the child columns.
     for (ColumnSchema col : colList) {
-      if (!col.getColumnName().contains(".")) {
+      if (!col.isComplexColumn()) {
         projectColumns.add(col.getColumnName());
       }
     }
diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
index 1dc0ee2..4dc1277 100644
--- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
+++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/Util.java
@@ -111,7 +111,7 @@ public class Util {
     List<ColumnSchema> columns = table.getTableInfo().getFactTable().getListOfColumns();
     List<ColumnSchema> validColumnSchema = new ArrayList<>();
     for (ColumnSchema column : columns) {
-      if (!column.isInvisible() && !column.getColumnName().contains(".")) {
+      if (!column.isInvisible() && !column.isIndexColumn() && !column.isComplexColumn()) {
         validColumnSchema.add(column);
       }
     }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 913fe1f..cf1c07c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -374,12 +374,14 @@ object CarbonDataRDDFactory {
               .getTableInfo
               .getFactTable
               .getListOfColumns
-              .asScala.filterNot(col => col.isInvisible || col.getColumnName.contains("."))
+              .asScala
+              .filterNot(col => col.isInvisible || col.isIndexColumn || col.isComplexColumn)
             val convertedRdd = CommonLoadUtils.getConvertedInternalRow(
               colSchema,
               scanResultRdd.get,
               isGlobalSortPartition = false)
-            if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
+            if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT) &&
+                !carbonLoadModel.isIndexColumnsPresent) {
               DataLoadProcessBuilderOnSpark.insertDataUsingGlobalSortWithInternalRow(sqlContext
                 .sparkSession,
                 convertedRdd,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
index f964490..a9a01ee 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/carbondata/execution/datasources/SparkCarbonFileFormat.scala
@@ -103,7 +103,7 @@ class SparkCarbonFileFormat extends FileFormat
     var schema = new StructType
     val fields = tableInfo.getFactTable.getListOfColumns.asScala.map { col =>
       // TODO find better way to know its a child
-      if (!col.getColumnName.contains(".")) {
+      if (!col.isComplexColumn) {
         Some((col.getSchemaOrdinal,
           StructField(col.getColumnName,
             SparkTypeConverter.convertCarbonToSparkDataType(col, table))))
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index ff57b0b..5fb19ae 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -161,9 +161,9 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
         null
       }
     val convertedStaticPartition = getConvertedStaticPartitionMap(partitionColumnSchema)
-    val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema(
-      tableInfo,
-      partitionColumnSchema)
+    val (reArrangedIndex, selectedColumnSchema) = getReArrangedIndexAndSelectedSchema(tableInfo,
+      partitionColumnSchema,
+      carbonLoadModel)
     val newLogicalPlan = getReArrangedLogicalPlan(
       reArrangedIndex,
       selectedColumnSchema,
@@ -468,7 +468,8 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
 
   def getReArrangedIndexAndSelectedSchema(
       tableInfo: TableInfo,
-      partitionColumnSchema: mutable.Buffer[ColumnSchema]): (Seq[Int], Seq[ColumnSchema]) = {
+      partitionColumnSchema: mutable.Buffer[ColumnSchema],
+      carbonLoadModel: CarbonLoadModel): (Seq[Int], Seq[ColumnSchema]) = {
     var reArrangedIndex: Seq[Int] = Seq()
     var selectedColumnSchema: Seq[ColumnSchema] = Seq()
     var partitionIndex: Seq[Int] = Seq()
@@ -495,16 +496,20 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
     }
     columnSchema.foreach {
       col =>
-        var skipPartitionColumn = false
-        if (partitionColumnNames != null &&
-            partitionColumnNames.contains(col.getColumnName)) {
-          partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
-          skipPartitionColumn = true
+        if (col.isIndexColumn) {
+          carbonLoadModel.setIndexColumnsPresent(true)
         } else {
-          reArrangedIndex = reArrangedIndex :+ createOrderMap(col.getColumnName)
-        }
-        if (!skipPartitionColumn) {
-          selectedColumnSchema = selectedColumnSchema :+ col
+          var skipPartitionColumn = false
+          if (partitionColumnNames != null &&
+              partitionColumnNames.contains(col.getColumnName)) {
+            partitionIndex = partitionIndex :+ createOrderMap(col.getColumnName)
+            skipPartitionColumn = true
+          } else {
+            reArrangedIndex = reArrangedIndex :+ createOrderMap(col.getColumnName)
+          }
+          if (!skipPartitionColumn) {
+            selectedColumnSchema = selectedColumnSchema :+ col
+          }
         }
     }
     if (partitionColumnSchema != null) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 38a1096..6f16f30 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -558,7 +558,7 @@ object CommonLoadUtils {
       .getListOfColumns
       .asScala
     if (table.getPartitionInfo != null) {
-      colSchema = colSchema.filterNot(x => x.isInvisible || x.getColumnName.contains(".") ||
+      colSchema = colSchema.filterNot(x => x.isInvisible || x.isComplexColumn ||
                                            x.getSchemaOrdinal == -1 ||
                                            table.getPartitionInfo.getColumnSchemaList.contains(x))
       colSchema = colSchema ++ table
@@ -568,7 +568,7 @@ object CommonLoadUtils {
         .getColumnSchemaList.size()))
         .asInstanceOf[Array[ColumnSchema]]
     } else {
-      colSchema = colSchema.filterNot(x => x.isInvisible || x.getColumnName.contains(".") ||
+      colSchema = colSchema.filterNot(x => x.isInvisible || x.isComplexColumn ||
                                            x.getSchemaOrdinal == -1)
     }
     val updatedRdd: RDD[InternalRow] = CommonLoadUtils.getConvertedInternalRow(
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 81c86ac..57778a8 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -2,12 +2,14 @@ package org.apache.carbondata.geo
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
-class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
+class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
+  val table1 = "geoTable1"
+  val table2 = "geotable2"
   override def beforeAll(): Unit = {
     drop()
   }
@@ -70,7 +72,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
     createTable()
     loadData()
     // Test if index handler column is added as a sort column
-    val descTable = sql("describe formatted geotable").collect
+    val descTable = sql(s"describe formatted $table1").collect
     descTable.find(_.get(0).toString.contains("Sort Scope")) match {
       case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
       case None => assert(false)
@@ -85,7 +87,7 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
     createTable()
     loadData()
     checkAnswer(
-      sql(s"select longitude, latitude from geotable where IN_POLYGON('116.321011 40.123503, " +
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503, " +
           s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
       Seq(Row(116187332, 39979316),
         Row(116362699, 39942444),
@@ -95,7 +97,74 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
         Row(116285807, 40084087)))
   }
 
-  after {
+  test("test insert into table select from another table") {
+    val sourceTable = table1;
+    val targetTable = table2;
+    createTable(sourceTable)
+    loadData(sourceTable)
+    createTable(targetTable)
+    sql(s"insert into  $targetTable select * from $sourceTable")
+    checkAnswer(
+      sql(s"select longitude, latitude from $targetTable where IN_POLYGON('116.321011 40.123503, " +
+          s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      Seq(Row(116187332, 39979316),
+        Row(116362699, 39942444),
+        Row(116288955, 39999101),
+        Row(116325378, 39963129),
+        Row(116337069, 39951887),
+        Row(116285807, 40084087)))
+  }
+
+  test("test insert into table select from another table with target table sort scope as global")
+  {
+    val sourceTable = table1;
+    val targetTable = table2;
+    createTable(sourceTable)
+    loadData(sourceTable)
+    createTable(targetTable, "'SORT_SCOPE'='GLOBAL_SORT',")
+    sql(s"insert into  $targetTable select * from $sourceTable")
+    checkAnswer(
+      sql(s"select longitude, latitude from $targetTable where IN_POLYGON('116.321011 40.123503, " +
+          s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      Seq(Row(116187332, 39979316),
+        Row(116362699, 39942444),
+        Row(116288955, 39999101),
+        Row(116325378, 39963129),
+        Row(116337069, 39951887),
+        Row(116285807, 40084087)))
+  }
+
+  test("test polygon query on table partitioned by timevalue column")
+  {
+    sql(s"""
+           | CREATE TABLE $table1(
+           | longitude LONG,
+           | latitude LONG) COMMENT "This is a GeoTable" PARTITIONED BY (timevalue BIGINT)
+           | STORED AS carbondata
+           | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
+           | 'INDEX_HANDLER.mygeohash.type'='geohash',
+           | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
+           | 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277',
+           | 'INDEX_HANDLER.mygeohash.gridSize'='50',
+           | 'INDEX_HANDLER.mygeohash.minLongitude'='115.811865',
+           | 'INDEX_HANDLER.mygeohash.maxLongitude'='116.782233',
+           | 'INDEX_HANDLER.mygeohash.minLatitude'='39.832277',
+           | 'INDEX_HANDLER.mygeohash.maxLatitude'='40.225281',
+           | 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    loadData()
+    checkAnswer(
+      sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503, " +
+          s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      Seq(Row(116187332, 39979316),
+        Row(116362699, 39942444),
+        Row(116288955, 39999101),
+        Row(116325378, 39963129),
+        Row(116337069, 39951887),
+        Row(116285807, 40084087)))
+  }
+
+  override def afterEach(): Unit = {
     drop()
   }
   override def afterAll(): Unit = {
@@ -103,17 +172,18 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
   }
 
   def drop(): Unit = {
-    sql("drop table if exists geotable")
+    sql(s"drop table if exists $table1")
+    sql(s"drop table if exists $table2")
   }
 
-  def createTable(): Unit = {
+  def createTable(tableName : String = table1, customProperties : String = ""): Unit = {
     sql(s"""
-           | CREATE TABLE geotable(
+           | CREATE TABLE $tableName(
            | timevalue BIGINT,
            | longitude LONG,
            | latitude LONG) COMMENT "This is a GeoTable"
            | STORED AS carbondata
-           | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
+           | TBLPROPERTIES ($customProperties 'INDEX_HANDLER'='mygeohash',
            | 'INDEX_HANDLER.mygeohash.type'='geohash',
            | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
            | 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277',
@@ -126,8 +196,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
        """.stripMargin)
   }
 
-  def loadData(): Unit = {
-    sql(s"""LOAD DATA local inpath '$resourcesPath/geodata.csv' INTO TABLE geotable OPTIONS
+  def loadData(tableName : String = table1): Unit = {
+    sql(s"""LOAD DATA local inpath '$resourcesPath/geodata.csv' INTO TABLE $tableName OPTIONS
            |('DELIMITER'= ',')""".stripMargin)
   }
 }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
index c807574..cdc8e01 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/parser/impl/JsonRowParser.java
@@ -163,7 +163,7 @@ public class JsonRowParser implements RowParser {
 
   private static String extractChildColumnName(CarbonColumn column) {
     String columnName = column.getColName();
-    if (columnName.contains(".")) {
+    if (column.getColumnSchema().isComplexColumn()) {
       // complex type child column names can be like following
       // a) struct type --> parent.child
       // b) array type --> parent.val.val...child [If create table flow]
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
index eba5b80..04d8669 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/InputProcessorStepWithNoConverterImpl.java
@@ -409,27 +409,28 @@ public class InputProcessorStepWithNoConverterImpl extends AbstractDataLoadProce
 
     private Object[] convertToNoDictionaryToBytesWithoutReArrange(Object[] data,
         DataField[] dataFields) {
-      Object[] newData = new Object[data.length];
+      Object[] newData = new Object[dataFields.length];
       // now dictionary is removed, no need of no dictionary mapping
-      for (int i = 0; i < data.length; i++) {
+      for (int i = 0, index = 0; i < dataFields.length; i++) {
         if (dataFields[i].getColumn().isIndexColumn()) {
           continue;
         }
         if (DataTypeUtil.isPrimitiveColumn(dataTypes[i])) {
           // keep the no dictionary measure column as original data
-          newData[i] = data[i];
+          newData[i] = data[index];
         } else if (dataTypes[i].isComplexType()) {
-          getComplexTypeByteArray(newData, i, data, dataFields[i], i, true);
-        } else if (dataTypes[i] == DataTypes.DATE && data[i] instanceof Long) {
+          getComplexTypeByteArray(newData, i, data, dataFields[i], index, true);
+        } else if (dataTypes[i] == DataTypes.DATE && data[index] instanceof Long) {
           if (dateDictionaryGenerator == null) {
             dateDictionaryGenerator = DirectDictionaryKeyGeneratorFactory
                 .getDirectDictionaryGenerator(dataTypes[i], dataFields[i].getDateFormat());
           }
-          newData[i] = dateDictionaryGenerator.generateKey((long) data[i]);
+          newData[i] = dateDictionaryGenerator.generateKey((long) data[index]);
         } else {
           newData[i] =
-              DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(data[i], dataTypes[i]);
+              DataTypeUtil.getBytesDataDataTypeForNoDictionaryColumn(data[index], dataTypes[i]);
         }
+        index++;
       }
       return newData;
     }
diff --git a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
index d9c5486..b13a0c5 100644
--- a/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
+++ b/sdk/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonSchemaReader.java
@@ -259,7 +259,7 @@ public class CarbonSchemaReader {
     List<ColumnSchema> schemaList = reader.readSchema();
     for (int i = 0; i < schemaList.size(); i++) {
       ColumnSchema columnSchema = schemaList.get(i);
-      if (!(columnSchema.getColumnName().contains("."))) {
+      if (!(columnSchema.isComplexColumn())) {
         columnSchemaList.add(columnSchema);
       }
     }