You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/08/24 12:51:42 UTC

[carbondata] branch master updated: [CARBONDATA-4119][CARBONDATA-4238][CARBONDATA-4237][CARBONDATA-4236] Support geo insert without geoId and document changes

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8de65a2  [CARBONDATA-4119][CARBONDATA-4238][CARBONDATA-4237][CARBONDATA-4236] Support geo insert without geoId and document changes
8de65a2 is described below

commit 8de65a2767a1d20ff3cad947303c686a0017df43
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Mon Aug 16 23:10:56 2021 +0530

    [CARBONDATA-4119][CARBONDATA-4238][CARBONDATA-4237][CARBONDATA-4236] Support geo insert without geoId and document changes
    
    Why is this PR needed?
    1. To insert without geoid (like load) on geo table.
    2. [CARBONDATA-4119] : User Input for GeoID column not validated.
    3. [CARBONDATA-4238] : Documentation Issue in ddl-of-carbondata.md#add-columns
    4. [CARBONDATA-4237] : Documentation issues in streaming-guide.md, file-structure-of-carbondata.md and sdk-guide.md.
    5. [CARBONDATA-4236] : Documenatation issues in configuration-parameters.md.
    6. import processing class in streaming-guide.md is wrong
    
    What changes were proposed in this PR?
    1. Made changes to support insert on geo table with auto-generated geoId.
    2. [CARBONDATA-4119] : Added documentation about insert with custom geoId. Changes in docs/spatial-index-guide.md
    3. Other documentation changes added.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4205
---
 .../core/constants/CarbonCommonConstants.java      |   4 +--
 docs/configuration-parameters.md                   |   4 +--
 docs/ddl-of-carbondata.md                          |   2 +-
 docs/file-structure-of-carbondata.md               |  15 +++++-----
 docs/images/2-1_1_latest.PNG                       | Bin 0 -> 31294 bytes
 docs/sdk-guide.md                                  |   4 +--
 docs/spatial-index-guide.md                        |   9 ++++++
 docs/streaming-guide.md                            |   5 +++-
 .../spark/sql/hive/CarbonAnalysisRules.scala       |  31 +++++++++++++++------
 .../scala/org/apache/carbondata/geo/GeoTest.scala  |  11 ++++++++
 10 files changed, 60 insertions(+), 25 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7b2fddf..d72d6c1 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2527,9 +2527,9 @@ public final class CarbonCommonConstants {
 
   /**
    * Default value for SI segment Compaction / merge small files
-   * Making this true degrade the LOAD performance
+   * Making this true degrades the LOAD performance
    * When the number of small files increase for SI segments(it can happen as number of columns will
-   * be less and we store position id and reference columns), user an either set to true which will
+   * be less and we store position id and reference columns), user can either set to true which will
    * merge the data files for upcoming loads or run SI rebuild command which does this job for all
    * segments. (REBUILD INDEX <index_table>)
    */
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 7a1b610..73bf2ce 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -116,8 +116,8 @@ This section provides the details of all the configurations required for the Car
 | carbon.enable.page.level.reader.in.compaction|false|Enabling page level reader for compaction reduces the memory usage while compacting more number of segments. It allows reading only page by page instead of reading whole blocklet to memory. **NOTE:** Please refer to [file-structure-of-carbondata](./file-structure-of-carbondata.md#carbondata-file-format) to understand the storage format of CarbonData and concepts of pages.|
 | carbon.concurrent.compaction | true | Compaction of different tables can be executed concurrently. This configuration determines whether to compact all qualifying tables in parallel or not. **NOTE:** Compacting concurrently is a resource demanding operation and needs more resources there by affecting the query performance also. This configuration is **deprecated** and might be removed in future releases. |
 | carbon.compaction.prefetch.enable | false | Compaction operation is similar to Query + data load where in data from qualifying segments are queried and data loading performed to generate a new single segment. This configuration determines whether to query ahead data from segments and feed it for data loading. **NOTE:** This configuration is disabled by default as it needs extra resources for querying extra data. Based on the memory availability on the cluster, user can enable it to imp [...]
-| carbon.enable.range.compaction | true | To configure Ranges-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
-| carbon.si.segment.merge | false | Making this true degrade the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user an either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
+| carbon.enable.range.compaction | true | To configure Range-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
+| carbon.si.segment.merge | false | Making this true degrades the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user can either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
 | carbon.partition.data.on.tasklevel | false | When enabled, tasks launched for Local sort partition load will be based on one node one task. Compaction will be performed based on task level for a partition. Load performance might be degraded, because, the number of tasks launched is equal to number of nodes in case of local sort. For compaction, memory consumption will be less, as more number of tasks will be launched for a partition |
 
 ## Query Configuration
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index b05a54a..dd09b91 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -778,7 +778,7 @@ CarbonData DDL statements are documented here,which includes:
       **NOTE:** Adding of only single-level Complex datatype columns(only array and struct) is supported.
       Example - 
       ```
-      ALTER TABLE <table-name> ADD COLUMNS(arrField array<array<int>>, structField struct<id1:string,name1:string>)
+      ALTER TABLE <table-name> ADD COLUMNS(arrField array<int>, structField struct<id1:string,name1:string>)
       ```
 Users can specify which columns to include and exclude for local dictionary generation after adding new columns. These will be appended with the already existing local dictionary include and exclude columns of main table respectively.
      
diff --git a/docs/file-structure-of-carbondata.md b/docs/file-structure-of-carbondata.md
index f82ccf3..449753e 100644
--- a/docs/file-structure-of-carbondata.md
+++ b/docs/file-structure-of-carbondata.md
@@ -46,14 +46,13 @@ The CarbonData files are stored in the location specified by the ***spark.sql.wa
 
   The file directory structure is as below: 
 
-![File Directory Structure](../docs/images/2-1_1.png?raw=true)
-
-1. ModifiedTime.mdt records the timestamp of the metadata with the modification time attribute of the file. When the drop table and create table are used, the modification time of the file is updated. This is common to all databases and hence is kept in parallel to databases
-2. The **default** is the database name and contains the user tables.default is used when user doesn't specify any database name;else user configured database name will be the directory name. user_table is the table name.
-3. Metadata directory stores schema files, tablestatus and dictionary files (including .dict, .dictmeta and .sortindex). There are three types of metadata data information files.
-4. data and index files are stored under directory named **Fact**. The Fact directory has a Part0 partition directory, where 0 is the partition number.
-5. There is a Segment_0 directory under the Part0 directory, where 0 is the segment number.
-6. There are two types of files, carbondata and carbonindex, in the Segment_0 directory.
+![File Directory Structure](../docs/images/2-1_1_latest.PNG?raw=true)
+
+1. The **default** is the database name and contains the user tables.default is used when user doesn't specify any database name;else user configured database name will be the directory name. user_table is the table name.
+2. Metadata directory stores schema files, tablestatus and segment details (includes .segment file for each segment). There are three types of metadata data information files.
+3. data and index files are stored under directory named **Fact**. The Fact directory has a Part0 partition directory, where 0 is the partition number.
+4. There is a Segment_0 directory under the Part0 directory, where 0 is the segment number.
+5. There are two types of files, carbondata and carbonmergeindex, in the Segment_0 directory.
 
 
 
diff --git a/docs/images/2-1_1_latest.PNG b/docs/images/2-1_1_latest.PNG
new file mode 100644
index 0000000..c15581d
Binary files /dev/null and b/docs/images/2-1_1_latest.PNG differ
diff --git a/docs/sdk-guide.md b/docs/sdk-guide.md
index 4da7893..fdd689c 100644
--- a/docs/sdk-guide.md
+++ b/docs/sdk-guide.md
@@ -742,9 +742,7 @@ int i = 0;
 while (reader.hasNext()) {
     Object[] row = (Object[]) reader.readNextRow();
     System.out.println(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t",
-        i, row[0], row[1], row[2], row[3], row[4], row[5],
-        new Date((day * ((int) row[6]))), new Timestamp((long) row[7] / 1000), row[8]
-    ));
+        i, row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8]));
     i++;
 }
 
diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index db7392c..8484d41 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -91,6 +91,15 @@ Note:
 | SPATIAL_INDEX.xxx.conversionRatio | Conversion factor. It allows user to translate longitude and latitude to long. For example, if the data to load is longitude = 13.123456, latitude = 101.12356. User can configure conversion ratio sub-property value as 1000000, and change data to load as longitude = 13123456 and latitude = 10112356. Operations on long is much faster compared to floating-point numbers.|
 | SPATIAL_INDEX.xxx.class | Optional user custom implementation class. Value is fully qualified class name.|
 
+### Load/Insert
+Load/Insert with no geoId column, then geoId will be generated internally.
+```
+insert into source_index select 1,116.285807,40.084087;
+```
+Load/Insert with custom geoId
+```
+insert into source_index select 0, 1,116.285807,40.084087;
+```
 
 ### Select Query
 
diff --git a/docs/streaming-guide.md b/docs/streaming-guide.md
index aec9b3c..3d5da85 100644
--- a/docs/streaming-guide.md
+++ b/docs/streaming-guide.md
@@ -62,7 +62,8 @@ Start spark-shell in new terminal, type :paste, then copy and run the following
  import java.io.File
  import org.apache.spark.sql.{CarbonEnv, SparkSession}
  import org.apache.spark.sql.CarbonSession._
- import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
+ import org.apache.spark.sql.streaming.Trigger.ProcessingTime
+ import org.apache.spark.sql.streaming.StreamingQuery
  import org.apache.carbondata.core.util.path.CarbonTablePath
  import org.apache.carbondata.streaming.parser.CarbonStreamParser
 
@@ -148,6 +149,8 @@ TBLPROPERTIES('streaming'='true')
  DESC FORMATTED streaming_table
  ```
 
+NOTE: Streaming table doesn't support alter table schema operations such as alter add column, drop column, rename column, change datatype and rename table name.
+
 ## Alter streaming property
 For an old table, use ALTER TABLE command to set the streaming property.
 ```sql
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 3689ec1..74bdca8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -22,7 +22,7 @@ import org.apache.spark.SPARK_VERSION
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonExpressions.CarbonUnresolvedRelation
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Expression, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
@@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.command.LoadDataCommand
 import org.apache.spark.sql.execution.command.mutation.CarbonProjectForDeleteCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.strategy.{CarbonPlanHelper, DMLHelper}
+import org.apache.spark.sql.types.NullType
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
@@ -267,17 +268,31 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
       child: LogicalPlan,
       containsMultipleInserts: Boolean): LogicalPlan = {
     val carbonDSRelation = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
-    if (carbonDSRelation.carbonRelation.output.size > CarbonCommonConstants
+    val carbonTable = carbonDSRelation.carbonRelation.carbonTable
+    val tableProperties = carbonTable.getTableInfo.getFactTable.getTableProperties
+    val spatialProperty = tableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+    val expectedOutput = carbonDSRelation.carbonRelation.output
+    if (expectedOutput.size > CarbonCommonConstants
       .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
       CarbonException.analysisException(
         s"Maximum number of columns supported: " +
           s"${CarbonCommonConstants.DEFAULT_MAX_NUMBER_OF_COLUMNS}")
     }
+    var newLogicalPlan = child
+    if (spatialProperty != null && !spatialProperty.isEmpty &&
+        child.output.size + 1 == expectedOutput.size) {
+      newLogicalPlan = child.transform {
+        // To support insert sql to automatically generate GeoId if customized input is not given.
+        case p: Project =>
+          val geoId = Alias(Literal(null, NullType).asInstanceOf[Expression], "NULL")()
+          val list = Seq(geoId) ++ p.projectList
+          Project(list, p.child)
+      }
+    }
     // In spark, PreprocessTableInsertion rule has below cast logic.
     // It was missed in carbon when implemented insert into rules.
-    val actualOutput = child.output
-    val expectedOutput = carbonDSRelation.carbonRelation.output
-    var newChildOutput = child.output.zip(expectedOutput)
+    val actualOutput = newLogicalPlan.output
+    var newChildOutput = newLogicalPlan.output.zip(expectedOutput)
       .map {
         case (actual, expected) =>
           if (expected.dataType.sameType(actual.dataType) &&
@@ -292,7 +307,7 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
               explicitMetadata = Option(expected.metadata))
           }
       } ++ actualOutput.takeRight(actualOutput.size - expectedOutput.size)
-    if (newChildOutput.size >= carbonDSRelation.carbonRelation.output.size ||
+    if (newChildOutput.size >= expectedOutput.size ||
         carbonDSRelation.carbonTable.isHivePartitionTable) {
       newChildOutput = newChildOutput.zipWithIndex.map { columnWithIndex =>
         columnWithIndex._1 match {
@@ -301,10 +316,10 @@ case class CarbonPreInsertionCasts(sparkSession: SparkSession) extends Rule[Logi
           case attr => attr
         }
       }
-      val newChild: LogicalPlan = if (newChildOutput == child.output) {
+      val newChild: LogicalPlan = if (newChildOutput == newLogicalPlan.output) {
         throw new UnsupportedOperationException(s"Spark version $SPARK_VERSION is not supported")
       } else {
-        Project(newChildOutput, child)
+        Project(newChildOutput, newLogicalPlan)
       }
 
       val overwrite = CarbonReflectionUtils.getOverWriteOption("overwrite", p)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index c7b2e5a..65e5622 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -868,6 +868,17 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
     )
   }
 
+  test("test insert with autogenerated geoid") {
+    createTable()
+    // insert without geoid
+    sql(s"insert into $table1 select 1575428400000,116285807,40084087")
+    // insert with customized geoid
+    sql(s"insert into $table1 select 0,1575428400000,116285807,40084087")
+    checkAnswer(sql(s"select *from $table1"),
+      Seq(Row(855280799612L, 1575428400000L, 116285807, 40084087),
+        Row(0, 1575428400000L, 116285807, 40084087)))
+  }
+
   override def afterEach(): Unit = {
     drop()
   }