You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/08/18 14:01:14 UTC

[carbondata] branch master updated: [CARBONDATA-3943] Handling the addition of geo column to hive at the time of table creation.

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 896a9bf  [CARBONDATA-3943] Handling the addition of geo column to hive at the time of table creation.
896a9bf is described below

commit 896a9bf1745b7f069789dabdb1afaaca705a2e4e
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Wed Aug 5 22:45:00 2020 +0530

    [CARBONDATA-3943] Handling the addition of geo column to hive at the time of table creation.
    
    Why is this PR needed?
    PR #3774 adds geocolumn to hive when it is generated at the time of load.
    
    What changes were proposed in this PR?
    Handling the addition of column at create table itself. Added example class for the scenario
    to check create geo table with carbon session.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3879
---
 .../GeoTableExampleWithCarbonSession.scala         | 93 ++++++++++++++++++++++
 .../scala/org/apache/spark/sql/CarbonSource.scala  | 18 ++++-
 .../spark/sql/catalyst/CarbonParserUtil.scala      |  4 +-
 .../spark/sql/hive/CarbonFileMetastore.scala       |  3 +-
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 39 +++++++--
 5 files changed, 145 insertions(+), 12 deletions(-)

diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/GeoTableExampleWithCarbonSession.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/GeoTableExampleWithCarbonSession.scala
new file mode 100644
index 0000000..ee4a4a0
--- /dev/null
+++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/GeoTableExampleWithCarbonSession.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.examples
+
+import java.io.File
+
+import org.apache.log4j.PropertyConfigurator
+import org.apache.spark.sql.SparkSession
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.examples.util.ExampleUtils
+
+object GeoTableExampleWithCarbonSession {
+
+  def main(args: Array[String]) {
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    System.setProperty("path.target", s"$rootPath/examples/spark/target")
+    // print profiler log to a separated file: target/profiler.log
+    PropertyConfigurator.configure(
+      s"$rootPath/examples/spark/src/main/resources/log4j.properties")
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "false")
+    val spark = ExampleUtils.createCarbonSession("GeoTableExampleWithCarbonSession")
+    spark.sparkContext.setLogLevel("error")
+    Seq(
+      "stored as carbondata",
+      "using carbondata",
+      "stored by 'carbondata'",
+      "stored by 'org.apache.carbondata.format'"
+    ).foreach { formatSyntax =>
+      exampleBody(spark, formatSyntax)
+    }
+    spark.close()
+  }
+
+  def exampleBody(spark: SparkSession, formatSyntax: String = "stored as carbondata"): Unit = {
+
+    val rootPath = new File(this.getClass.getResource("/").getPath
+                            + "../../../..").getCanonicalPath
+    val path = s"$rootPath/integration/spark/src/test/resources/geodata.csv"
+
+    spark.sql("DROP TABLE IF EXISTS geoTable")
+
+    // Create table
+    spark.sql(
+      s"""
+        CREATE TABLE geoTable(
+         | timevalue BIGINT,
+         | longitude LONG,
+         | latitude LONG)
+         | $formatSyntax
+         |  TBLPROPERTIES ('SPATIAL_INDEX'='mygeohash',
+         | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    spark.sql("select *from geoTable").show()
+    val descTable = spark.sql(s"describe formatted geoTable").collect
+    // Test if spatial index column is added to column schema
+    descTable.find(_.get(0).toString.contains("mygeohash")) match {
+      case Some(row) => assert(row.get(1).toString.contains("bigint"))
+      case None => assert(false)
+    }
+    spark.sql(s"""LOAD DATA local inpath '$path' INTO TABLE geoTable OPTIONS
+           |('DELIMITER'= ',')""".stripMargin)
+    spark.sql("select *from geoTable").show()
+    spark.sql("DROP TABLE IF EXISTS geoTable")
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 060de42..aab1a8c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.hive.CarbonMetaStore
 import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{Metadata, StructField, StructType}
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -281,10 +281,22 @@ object CarbonSource {
       isExternal)
     val updatedFormat = CarbonToSparkAdapter
       .getUpdatedStorageFormat(storageFormat, updatedTableProperties, tableInfo.getTablePath)
+    var catalogSchema = table.schema
+    val columnSchemas = tableInfo.getFactTable.getListOfColumns
+    val spatialProperty = updatedTableProperties.get(CarbonCommonConstants.SPATIAL_INDEX)
+    // In case of geo table, an additional spatial column is generated.
+    // Update schema with new column.
+    if (spatialProperty.isDefined) {
+      val spatialColumn = columnSchemas.asScala
+        .find(schema => schema.getColumnName.equalsIgnoreCase(spatialProperty.get.trim)).get
+      val additionalSchema = StructType(Array(StructField(spatialColumn.getColumnName,
+        org.apache.spark.sql.types.DataTypes.LongType, true, Metadata.empty)))
+      catalogSchema = StructType(additionalSchema ++ catalogSchema)
+    }
     val updatedSchema = if (isExternal) {
-      table.schema
+      catalogSchema
     } else {
-      CarbonSparkUtil.updateStruct(table.schema)
+      CarbonSparkUtil.updateStruct(catalogSchema)
     }
     table.copy(
       storage = updatedFormat,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
index 55a6e9d..208befc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonParserUtil.scala
@@ -213,13 +213,13 @@ object CarbonParserUtil {
 
     // Process spatial index property
     val indexFields = processSpatialIndexProperty(tableProperties, fields)
-    val allFields = fields ++ indexFields
+    val allFields = indexFields ++ fields
 
     // do not allow below key words as column name
     validateColumnNames(allFields)
     CommonUtil.validateForSpatialTypeColumn(tableProperties)
 
-    fields.zipWithIndex.foreach { case (field, index) =>
+    allFields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
     }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index b16579e..932c175 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -238,7 +238,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
         // Here, catalogTable will have spatial column in schema which is used to build carbon
         // table. As spatial column is not supposed to be present in user-defined columns,
         // removing it here. Later from tableproperties the column will be added in carbonTable.
-        val spatialProperty = catalogTable.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+        val spatialProperty = catalogTable.storage.properties
+          .get(CarbonCommonConstants.SPATIAL_INDEX)
         if (spatialProperty.isDefined) {
           val originalSchema = StructType(catalogTable.schema.
             filterNot(_.name.equalsIgnoreCase(spatialProperty.get.trim)))
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 7235527..b7fcd37 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -131,7 +131,6 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 
   test("test materialized view with spatial column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedCarbonCommandException](sql(
       s"CREATE MATERIALIZED VIEW view1 AS SELECT longitude, mygeohash FROM $table1"))
     assert(exception.getMessage.contains(
@@ -141,7 +140,6 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 
   test("test geo table create index on spatial column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedIndexCommandException](sql(
       s"""
          | CREATE INDEX bloom_index ON TABLE $table1 (mygeohash)
@@ -152,9 +150,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       s"Spatial Index column is not supported, column 'mygeohash' is spatial column"))
   }
 
-  test("test geo table create and load and check describe formatted") {
+  test("test geo table create with spark session and check describe formatted") {
     createTable()
-    loadData()
     // Test if spatial index column is added as a sort column
     val descTable = sql(s"describe formatted $table1").collect
     descTable.find(_.get(0).toString.contains("Sort Scope")) match {
@@ -165,11 +162,42 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
       case None => assert(false)
     }
+    // Test if spatial index column is added to column schema
+    descTable.find(_.get(0).toString.contains("mygeohash")) match {
+      case Some(row) => assert(row.get(1).toString.contains("bigint"))
+      case None => assert(false)
+    }
+  }
+
+  test("test create geo table with spark session having syntax: using carbondata") {
+    sql(
+      s"""
+         | CREATE TABLE $table1(
+         | timevalue BIGINT,
+         | longitude LONG,
+         | latitude LONG)
+         | using carbondata
+         | options ('SPATIAL_INDEX'='mygeohash',
+         | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+         | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+         | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+         | 'SPATIAL_INDEX.mygeohash.minLongitude'='115.811865',
+         | 'SPATIAL_INDEX.mygeohash.maxLongitude'='116.782233',
+         | 'SPATIAL_INDEX.mygeohash.minLatitude'='39.832277',
+         | 'SPATIAL_INDEX.mygeohash.maxLatitude'='40.225281',
+         | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
+       """.stripMargin)
+    val descTable = sql(s"describe formatted $table1").collect
+    // Test if spatial index column is added to column schema
+    descTable.find(_.get(0).toString.contains("mygeohash")) match {
+      case Some(row) => assert(row.get(1).toString.contains("bigint"))
+      case None => assert(false)
+    }
   }
 
   test("test geo table drop spatial index column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedCarbonCommandException](
       sql(s"alter table $table1 drop columns(mygeohash)"))
     assert(exception.getMessage.contains(
@@ -179,7 +207,6 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 
   test("test geo table alter spatial index column") {
     createTable()
-    loadData()
     val exception = intercept[MalformedCarbonCommandException](
       sql(s"update $table1 set (mygeohash)=(111111) where longitude=116285807 "))
     assert(exception.getMessage.contains(