You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2021/10/12 16:20:27 UTC

[carbondata] branch master updated: [CARBONDATA-4292] Spatial index creation using spark dataframe

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b8d9a97  [CARBONDATA-4292] Spatial index creation using spark dataframe
b8d9a97 is described below

commit b8d9a97926db77a8d75c8988b88c9d5f74df3415
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Wed Sep 22 11:41:11 2021 +0530

    [CARBONDATA-4292] Spatial index creation using spark dataframe
    
    Why is this PR needed?
    To support spatial index creation using spark data frame
    
    What changes were proposed in this PR?
    Added spatial properties in carbonOptions and edited existing testcases.
    
    Does this PR introduce any user interface change?
    Yes
    
    Is any new testcase added?
    Yes
    
    This closes #4222
---
 docs/spatial-index-guide.md                        |  27 +
 .../org/apache/carbondata/spark/CarbonOption.scala |  22 +
 .../apache/spark/sql/CarbonDataFrameWriter.scala   |  15 +-
 .../command/management/CommonLoadUtils.scala       |   6 +-
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 647 +++++++++++++--------
 5 files changed, 467 insertions(+), 250 deletions(-)

diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index 08d487d..90f1b09 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -74,6 +74,30 @@ create table source_index(id BIGINT, latitude long, longitude long) stored by 'c
 'SPATIAL_INDEX.mygeohash.gridSize'='50',
 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000');
 ```
+
+Create spatial table using spark dataframe
+
+    val geoSchema = StructType(Seq(StructField("timevalue", LongType, nullable = true),
+    StructField("longitude", LongType, nullable = false),
+    StructField("latitude", LongType, nullable = false)))
+
+    val geoDf = sqlContext.read.option("delimeter", ",").option("header", "true").schema(geoSchema)
+      .csv(s"$resourcesPath/geodata.csv")
+
+	geoDf.write
+      .format("carbondata")
+      .option("tableName", "geo1")
+      .option("SPATIAL_INDEX", "mygeohash")
+      .option("SPATIAL_INDEX.mygeohash.type", "geohash")
+      .option("SPATIAL_INDEX.mygeohash.sourcecolumns", "longitude, latitude")
+      .option("SPATIAL_INDEX.mygeohash.originLatitude", "39.832277")
+      .option("SPATIAL_INDEX.mygeohash.gridSize", "50")
+      .option("SPATIAL_INDEX.mygeohash.conversionRatio", "1000000")
+      .option("SPATIAL_INDEX.mygeohash.class", "org.apache.carbondata.geo.GeoHashIndex")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+
 Note: 
    * `mygeohash` in the above example represent the index name.
    * Columns present in spatial_index table properties cannot be altered
@@ -103,6 +127,9 @@ Load/Insert with custom geoId
 insert into source_index select 0, 1,116.285807,40.084087;
 ```
 
+Note:
+* Load custom geoId values using dataframe is not supported.
+
 ### Select Query
 
 Query with Polygon UDF predicate
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index a4ebfbd..8642922 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -84,5 +84,27 @@ class CarbonOption(options: Map[String, String]) {
 
   lazy val dateformat: Option[String] = options.get("dateformat")
 
+  lazy val SPATIAL_INDEX: Option[String] = options.get("SPATIAL_INDEX")
+
+  val indexName = SPATIAL_INDEX.getOrElse("")
+
+  lazy val SPATIAL_INDEX_type: Option[String] = options.get(
+    s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.type")
+
+  lazy val SPATIAL_INDEX_sourcecolumns: Option[String] = options.get(
+    s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.sourcecolumns")
+
+  lazy val SPATIAL_INDEX_originLatitude: Option[String] = options.get(
+    s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.originLatitude")
+
+  lazy val SPATIAL_INDEX_gridSize: Option[String] = options.get(
+    s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.gridSize")
+
+  lazy val SPATIAL_INDEX_conversionRatio: Option[String] = options.get(
+    s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.conversionRatio")
+
+  lazy val SPATIAL_INDEX_class: Option[String] = options.get(
+    s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.class")
+
   def toMap: Map[String, String] = options
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index a0a871a..c9b77cd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
 import org.apache.carbondata.spark.CarbonOption
@@ -82,6 +83,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
   }
 
   private def makeCreateTableString(schema: StructType, options: CarbonOption): String = {
+    val indexName = options.indexName
     val property = Map(
       "SORT_COLUMNS" -> options.sortColumns,
       "SORT_SCOPE" -> options.sortScope,
@@ -91,7 +93,18 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       "TABLE_PAGE_SIZE_INMB" -> options.tablePageSizeInMb,
       "STREAMING" -> Option(options.isStreaming.toString),
       "DATEFORMAT" -> options.dateformat,
-      "TIMESTAMPFORMAT" -> options.timestampformat
+      "TIMESTAMPFORMAT" -> options.timestampformat,
+      "SPATIAL_INDEX" -> options.SPATIAL_INDEX,
+      s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.type" -> options.SPATIAL_INDEX_type,
+      s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.sourcecolumns" ->
+      options.SPATIAL_INDEX_sourcecolumns,
+      s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.originLatitude" ->
+      options.SPATIAL_INDEX_originLatitude,
+      s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.gridSize" ->
+      options.SPATIAL_INDEX_gridSize,
+      s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.conversionRatio" ->
+      options.SPATIAL_INDEX_conversionRatio,
+      s"${ CarbonCommonConstants.SPATIAL_INDEX }.$indexName.class" -> options.SPATIAL_INDEX_class
     ).filter(_._2.isDefined)
       .map(property => s"'${property._1}' = '${property._2.get}'").mkString(",")
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index fc19a9b..2cf9488 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -922,7 +922,11 @@ object CommonLoadUtils {
               .map(columnName => columnName.toLowerCase())
             attributes.filterNot(a => staticPartCols.contains(a.name.toLowerCase))
           }
-          if (expectedColumns.length != dfAttributes.length) {
+          val spatialProperty = catalogTable.properties.get(CarbonCommonConstants.SPATIAL_INDEX)
+          // For spatial table, dataframe attributes will not contain geoId column.
+          val isSpatialTable = spatialProperty.isDefined && spatialProperty.nonEmpty &&
+                                   dfAttributes.length + 1 == expectedColumns.size
+          if (expectedColumns.length != dfAttributes.length && !isSpatialTable) {
             throw new AnalysisException(
               s"Cannot insert into table $loadParams.tableName because the number of columns are " +
               s"different: " +
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 65e5622..df916d4 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -19,9 +19,9 @@ package org.apache.carbondata.geo
 
 import scala.collection.mutable
 
-import org.apache.spark.SparkException
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, StructType}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedIndexCommandException}
@@ -30,6 +30,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   val table1 = "geoTable1"
   val table2 = "geotable2"
+  val dfTable1 = "dfGeoTable1"
+  val dfTable2 = "dfGeoTable2"
   val result = Seq(Row(116187332, 39979316),
     Row(116362699, 39942444),
     Row(116288955, 39999101),
@@ -95,6 +97,52 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       s"table."))
   }
 
+  test("test Invalid spatial index property with dataframe") {
+    val geoDf = createDf()
+    // Index name must not match with table column name.  Fails to create table.
+    var exception = intercept[MalformedCarbonCommandException](geoDf.write
+      .format("carbondata")
+      .option("tableName", s"$table1")
+      .option("SPATIAL_INDEX", "longitude")
+      .mode(SaveMode.Overwrite)
+      .save())
+    assert(exception.getMessage.contains(
+      "index: longitude must not match with any other column name in the table"))
+    // Type property is not configured. Fails to create table.
+    exception = intercept[MalformedCarbonCommandException](geoDf.write
+      .format("carbondata")
+      .option("tableName", s"$table1")
+      .option("SPATIAL_INDEX", "mygeohash")
+      .mode(SaveMode.Overwrite)
+      .save())
+    assert(exception.getMessage.contains(
+      s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.type property must be specified"))
+    // Source columns are not configured. Fails to create table.
+    exception = intercept[MalformedCarbonCommandException](geoDf.write
+      .format("carbondata")
+      .option("tableName", s"$table1")
+      .option("SPATIAL_INDEX", "geo1")
+      .option("SPATIAL_INDEX.geo1.type", "geohash")
+      .mode(SaveMode.Overwrite)
+      .save())
+    assert(exception.getMessage.contains(
+      s"${CarbonCommonConstants.SPATIAL_INDEX}.geo1.sourcecolumns property must be " +
+      s"specified."))
+    // Source columns must be present in the table. Fails to create table.
+    exception = intercept[MalformedCarbonCommandException](geoDf.write
+      .format("carbondata")
+      .option("tableName", s"$table1")
+      .option("SPATIAL_INDEX", "mygeohash")
+      .option("SPATIAL_INDEX.mygeohash.type", "geohash")
+      .option("SPATIAL_INDEX.mygeohash.sourcecolumns", "unknown1, unknown2")
+      .mode(SaveMode.Overwrite)
+      .save())
+    assert(exception.getMessage.contains(
+      s"Source column: unknown1 in property " +
+      s"${CarbonCommonConstants.SPATIAL_INDEX}.mygeohash.sourcecolumns must be a column in the " +
+      s"table."))
+  }
+
   test("test geo table with invalid table properties") {
     var exception = intercept[MalformedCarbonCommandException](
       createTable(table1, " 'RANGE_COLUMN'='timevalue', 'COLUMN_META_CACHE' = 'mygeohash', "))
@@ -211,33 +259,39 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 
   test("test geo table create index on spatial column") {
     createTable()
-    val exception = intercept[MalformedIndexCommandException](sql(
-      s"""
-         | CREATE INDEX bloom_index ON TABLE $table1 (mygeohash)
-         | AS 'bloomfilter'
-         | PROPERTIES('BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      val exception = intercept[MalformedIndexCommandException](sql(
+        s"""
+           | CREATE INDEX bloom_index ON TABLE $table (mygeohash)
+           | AS 'bloomfilter'
+           | PROPERTIES('BLOOM_SIZE'='640000', 'BLOOM_FPP'='0.00001')
       """.stripMargin))
-    assert(exception.getMessage.contains(
-      s"Spatial Index column is not supported, column 'mygeohash' is spatial column"))
+      assert(exception.getMessage.contains(
+        s"Spatial Index column is not supported, column 'mygeohash' is spatial column"))
+    })
   }
 
   test("test geo table create with spark session and check describe formatted") {
     createTable()
-    // Test if spatial index column is added as a sort column
-    val descTable = sql(s"describe formatted $table1").collect
-    descTable.find(_.get(0).toString.contains("Sort Scope")) match {
-      case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
-      case None => assert(false)
-    }
-    descTable.find(_.get(0).toString.contains("Sort Columns")) match {
-      case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
-      case None => assert(false)
-    }
-    // Test if spatial index column is added to column schema
-    descTable.find(_.get(0).toString.contains("mygeohash")) match {
-      case Some(row) => assert(row.get(1).toString.contains("bigint"))
-      case None => assert(false)
-    }
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      // Test if spatial index column is added as a sort column
+      val descTable = sql(s"describe formatted $table").collect
+      descTable.find(_.get(0).toString.contains("Sort Scope")) match {
+        case Some(row) => assert(row.get(1).toString.contains("LOCAL_SORT"))
+        case None => assert(false)
+      }
+      descTable.find(_.get(0).toString.contains("Sort Columns")) match {
+        case Some(row) => assert(row.get(1).toString.contains("mygeohash"))
+        case None => assert(false)
+      }
+      // Test if spatial index column is added to column schema
+      descTable.find(_.get(0).toString.contains("mygeohash")) match {
+        case Some(row) => assert(row.get(1).toString.contains("bigint"))
+        case None => assert(false)
+      }
+    })
   }
 
   test("test create geo table with spark session having syntax: using carbondata") {
@@ -274,11 +328,14 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 
   test("test geo table alter spatial index column") {
     createTable()
-    val exception = intercept[MalformedCarbonCommandException](
-      sql(s"update $table1 set (mygeohash)=(111111) where longitude=116285807 "))
-    assert(exception.getMessage.contains(
-      s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
-      s"table property cannot be altered/updated"))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      val exception = intercept[MalformedCarbonCommandException](
+        sql(s"update $table set (mygeohash)=(111111) where longitude=116285807 "))
+      assert(exception.getMessage.contains(
+        s"Columns present in ${ CarbonCommonConstants.SPATIAL_INDEX } " +
+        s"table property cannot be altered/updated"))
+    })
   }
 
   test("test geo table filter by geo spatial index column") {
@@ -291,10 +348,13 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   test("test polygon query") {
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503, " +
-          s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
-      result)
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('116.321011 40.123503, " +
+            s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+        result)
+    })
   }
 
   test("test insert into table select from another table") {
@@ -423,10 +483,26 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
            | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
     loadData()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('116.321011 40.123503, " +
-          s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
-      result)
+    val geodf = createDf()
+    geodf.write
+      .format("carbondata")
+      .option("tableName", s"$dfTable1")
+      .option("partitionColumns", "timevalue")
+      .option("SPATIAL_INDEX", "mygeohash")
+      .option("SPATIAL_INDEX.mygeohash.type", "geohash")
+      .option("SPATIAL_INDEX.mygeohash.sourcecolumns", "longitude, latitude")
+      .option("SPATIAL_INDEX.mygeohash.originLatitude", "39.832277")
+      .option("SPATIAL_INDEX.mygeohash.gridSize", "50")
+      .option("SPATIAL_INDEX.mygeohash.conversionRatio", "1000000")
+      .option("SPATIAL_INDEX.mygeohash.class", "org.apache.carbondata.geo.GeoHashIndex")
+      .mode(SaveMode.Overwrite)
+      .save()
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('116.321011 40.123503, " +
+            s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+        result)
+    })
   }
 
   test("test insert into geo table with customized spatial index and polygon query") {
@@ -479,220 +555,247 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   test("test polygon list query: union of two polygons which are intersected") {
     createTable()
     loadData2()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
-      Seq(Row(120177080, 30326882),
-        Row(120180685, 30326327),
-        Row(120184976, 30327105),
-        Row(120176365, 30320687),
-        Row(120179669, 30323688),
-        Row(120181001, 30320761),
-        Row(120187094, 30323540),
-        Row(120186192, 30320132),
-        Row(120181001, 30317316)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
-        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946')"),
-      Seq(Row(120184976, 30327105),
-        Row(120189311, 30327549),
-        Row(120187094, 30323540),
-        Row(120193574, 30323651),
-        Row(120186192, 30320132),
-        Row(120190055, 30317464),
-        Row(120196020, 30321651)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
-        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
-        s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
-        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " +
-        s"'OR')"),
-      Seq(Row(120177080, 30326882),
-        Row(120180685, 30326327),
-        Row(120184976, 30327105),
-        Row(120176365, 30320687),
-        Row(120179669, 30323688),
-        Row(120181001, 30320761),
-        Row(120187094, 30323540),
-        Row(120186192, 30320132),
-        Row(120181001, 30317316),
-        Row(120189311, 30327549),
-        Row(120193574, 30323651),
-        Row(120190055, 30317464),
-        Row(120196020, 30321651)))
+    createTableWithDf(createDf2(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+        Seq(Row(120177080, 30326882),
+          Row(120180685, 30326327),
+          Row(120184976, 30327105),
+          Row(120176365, 30320687),
+          Row(120179669, 30323688),
+          Row(120181001, 30320761),
+          Row(120187094, 30323540),
+          Row(120186192, 30320132),
+          Row(120181001, 30317316)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
+            s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946')"),
+        Seq(Row(120184976, 30327105),
+          Row(120189311, 30327549),
+          Row(120187094, 30323540),
+          Row(120193574, 30323651),
+          Row(120186192, 30320132),
+          Row(120190055, 30317464),
+          Row(120196020, 30321651)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON_LIST(" +
+            s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+            s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
+            s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " +
+            s"'OR')"),
+        Seq(Row(120177080, 30326882),
+          Row(120180685, 30326327),
+          Row(120184976, 30327105),
+          Row(120176365, 30320687),
+          Row(120179669, 30323688),
+          Row(120181001, 30320761),
+          Row(120187094, 30323540),
+          Row(120186192, 30320132),
+          Row(120181001, 30317316),
+          Row(120189311, 30327549),
+          Row(120193574, 30323651),
+          Row(120190055, 30317464),
+          Row(120196020, 30321651)))
+    })
   }
 
   test("test polygon list query: intersection of two polygons which are intersected") {
     createTable()
     loadData2()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
-      Seq(Row(120177080, 30326882),
-        Row(120180685, 30326327),
-        Row(120184976, 30327105),
-        Row(120176365, 30320687),
-        Row(120179669, 30323688),
-        Row(120181001, 30320761),
-        Row(120187094, 30323540),
-        Row(120186192, 30320132),
-        Row(120181001, 30317316)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
-        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946')"),
-      Seq(Row(120184976, 30327105),
-        Row(120189311, 30327549),
-        Row(120187094, 30323540),
-        Row(120193574, 30323651),
-        Row(120186192, 30320132),
-        Row(120190055, 30317464),
-        Row(120196020, 30321651)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
-        s"'polygon ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
-        s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
-        s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " +
-        s"'AND')"),
-      Seq(Row(120184976, 30327105),
-        Row(120187094, 30323540),
-        Row(120186192, 30320132)))
+    createTableWithDf(createDf2(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+        Seq(Row(120177080, 30326882),
+          Row(120180685, 30326327),
+          Row(120184976, 30327105),
+          Row(120176365, 30320687),
+          Row(120179669, 30323688),
+          Row(120181001, 30320761),
+          Row(120187094, 30323540),
+          Row(120186192, 30320132),
+          Row(120181001, 30317316)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
+            s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946')"),
+        Seq(Row(120184976, 30327105),
+          Row(120189311, 30327549),
+          Row(120187094, 30323540),
+          Row(120193574, 30323651),
+          Row(120186192, 30320132),
+          Row(120190055, 30317464),
+          Row(120196020, 30321651)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON_LIST(" +
+            s"'polygon ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+            s"POLYGON ((120.191603 30.328946,120.184179 30.327465,120.181819 30.321464," +
+            s"120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))', " +
+            s"'AND')"),
+        Seq(Row(120184976, 30327105),
+          Row(120187094, 30323540),
+          Row(120186192, 30320132)))
+    })
   }
 
   test("test polygon list query: intersection of two polygons which are not intersected") {
     createTable()
     loadData2()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
-      Seq(Row(120177080, 30326882),
-        Row(120180685, 30326327),
-        Row(120184976, 30327105),
-        Row(120176365, 30320687),
-        Row(120179669, 30323688),
-        Row(120181001, 30320761),
-        Row(120187094, 30323540),
-        Row(120186192, 30320132),
-        Row(120181001, 30317316)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.164492 30.326279,120.160629 30.318870,120.172259 30.315351,120.164492 30.326279')"),
-      Seq(Row(120164563, 30322243),
-        Row(120168211, 30318057)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
-        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
-        s"POLYGON ((120.164492 30.326279,120.160629 30.318870,120.172259 30.315351," +
-        s"120.164492 30.326279))', " +
-        s"'AND')"),
-      Seq())
+    createTableWithDf(createDf2(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+        Seq(Row(120177080, 30326882),
+          Row(120180685, 30326327),
+          Row(120184976, 30327105),
+          Row(120176365, 30320687),
+          Row(120179669, 30323688),
+          Row(120181001, 30320761),
+          Row(120187094, 30323540),
+          Row(120186192, 30320132),
+          Row(120181001, 30317316)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.164492 30.326279,120.160629 30.318870,120.172259 30.315351,120.164492 " +
+            s"30.326279')"),
+        Seq(Row(120164563, 30322243),
+          Row(120168211, 30318057)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON_LIST(" +
+            s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+            s"POLYGON ((120.164492 30.326279,120.160629 30.318870,120.172259 30.315351," +
+            s"120.164492 30.326279))', " +
+            s"'AND')"),
+        Seq())
+    })
   }
 
   test("test polygon list query: intersection of two polygons when second polygon " +
     "is completely in first polygon") {
     createTable()
     loadData2()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
-      Seq(Row(120177080, 30326882),
-        Row(120180685, 30326327),
-        Row(120184976, 30327105),
-        Row(120176365, 30320687),
-        Row(120179669, 30323688),
-        Row(120181001, 30320761),
-        Row(120187094, 30323540),
-        Row(120186192, 30320132),
-        Row(120181001, 30317316)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON('" +
-        s"120.179442 30.325205,120.177253 30.322242,120.180944 30.319426," +
-        s"120.186094 30.321834,120.179442 30.325205')"),
-      Seq(Row(120179669, 30323688),
-        Row(120181001, 30320761)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYGON_LIST(" +
-        s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
-        s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
-        s"POLYGON ((120.179442 30.325205,120.177253 30.322242,120.180944 30.319426," +
-        s"120.186094 30.321834,120.179442 30.325205))', " +
-        s"'AND')"),
-      Seq(Row(120179669, 30323688),
-        Row(120181001, 30320761)))
+    createTableWithDf(createDf2(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431')"),
+        Seq(Row(120177080, 30326882),
+          Row(120180685, 30326327),
+          Row(120184976, 30327105),
+          Row(120176365, 30320687),
+          Row(120179669, 30323688),
+          Row(120181001, 30320761),
+          Row(120187094, 30323540),
+          Row(120186192, 30320132),
+          Row(120181001, 30317316)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON('" +
+            s"120.179442 30.325205,120.177253 30.322242,120.180944 30.319426," +
+            s"120.186094 30.321834,120.179442 30.325205')"),
+        Seq(Row(120179669, 30323688),
+          Row(120181001, 30320761)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYGON_LIST(" +
+            s"'POLYGON ((120.176433 30.327431,120.171283 30.322245,120.181411 30.314540," +
+            s"120.190509 30.321653,120.185188 30.329358,120.176433 30.327431)), " +
+            s"POLYGON ((120.179442 30.325205,120.177253 30.322242,120.180944 30.319426," +
+            s"120.186094 30.321834,120.179442 30.325205))', " +
+            s"'AND')"),
+        Seq(Row(120179669, 30323688),
+          Row(120181001, 30320761)))
+    })
   }
 
   test("test one polyline query") {
     createTable()
     loadData2()
-    val df = sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
-        s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464, " +
-        s"120.190359 30.315388)', 65)")
-    checkAnswer(df, Seq(Row(120184976, 30327105),
+    createTableWithDf(createDf2(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      val df = sql(s"select longitude, latitude from $table where IN_POLYLINE_LIST(" +
+                   s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 " +
+                   s"30.324464, 120.190359 30.315388)', 65)")
+      checkAnswer(df, Seq(Row(120184976, 30327105),
         Row(120197093, 30325985),
         Row(120196020, 30321651),
         Row(120198638, 30323540)))
-    checkAnswer(sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
-          s"'LINESTRING(120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464, " +
-          s"120.190359 30.315388)', 65)"), df)
+      checkAnswer(sql(s"select longitude, latitude from $table where IN_POLYLINE_LIST(" +
+                      s"'LINESTRING(120.184179 30.327465, 120.191603 30.328946, 120.199242 " +
+                      s"30.324464, 120.190359 30.315388)', 65)"), df)
+    })
   }
 
   test("test polyline list query, result is union of two polylines") {
     createTable()
     loadData2()
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
-        s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464)', 65)"),
-      Seq(Row(120184976, 30327105),
-        Row(120197093, 30325985)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
-        s"'LINESTRING (120.199242 30.324464, 120.190359 30.315388)', 65)"),
-      Seq(Row(120196020, 30321651),
-        Row(120198638, 30323540)))
-    checkAnswer(
-      sql(s"select longitude, latitude from $table1 where IN_POLYLINE_LIST(" +
-        s"'linestring (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464), " +
-        s"linestring (120.199242 30.324464, 120.190359 30.315388)', 65)"),
-      Seq(Row(120184976, 30327105),
-        Row(120197093, 30325985),
-        Row(120196020, 30321651),
-        Row(120198638, 30323540)))
+    createTableWithDf(createDf2(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYLINE_LIST(" +
+            s"'LINESTRING (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464)', " +
+            s"65)"), Seq(Row(120184976, 30327105), Row(120197093, 30325985)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYLINE_LIST(" +
+            s"'LINESTRING (120.199242 30.324464, 120.190359 30.315388)', 65)"),
+        Seq(Row(120196020, 30321651),
+          Row(120198638, 30323540)))
+      checkAnswer(
+        sql(s"select longitude, latitude from $table where IN_POLYLINE_LIST(" +
+            s"'linestring (120.184179 30.327465, 120.191603 30.328946, 120.199242 30.324464), " +
+            s"linestring (120.199242 30.324464, 120.190359 30.315388)', 65)"),
+        Seq(Row(120184976, 30327105),
+          Row(120197093, 30325985),
+          Row(120196020, 30321651),
+          Row(120198638, 30323540)))
+    })
   }
 
   test("test one range list query which have no overlapping range") {
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select mygeohash, longitude, latitude from $table1 where IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST(855279368848 855279368850, 855280799610 855280799612)', 'OR')"),
-      Seq(Row(855279368850L, 116288955, 39999101),
-        Row(855280799612L, 116285807, 40084087)))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select mygeohash, longitude, latitude from $table where IN_POLYGON_RANGE_LIST(" +
+            s"'RANGELIST(855279368848 855279368850, 855280799610 855280799612)', 'OR')"),
+        Seq(Row(855279368850L, 116288955, 39999101),
+          Row(855280799612L, 116285807, 40084087)))
+    })
   }
 
   test("test one range list query which have overlapping range") {
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select mygeohash, longitude, latitude from $table1 where IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST (855279368848 855279368850, 855279368849 855279368852)', 'OR')"),
-      Seq(Row(855279368850L, 116288955, 39999101)))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select mygeohash, longitude, latitude from $table where IN_POLYGON_RANGE_LIST(" +
+            s"'RANGELIST (855279368848 855279368850, 855279368849 855279368852)', 'OR')"),
+        Seq(Row(855279368850L, 116288955, 39999101)))
+    })
   }
 
   test("test one range list query when one range contains another range") {
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select mygeohash, longitude, latitude from $table1 where IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST (855279368848 855279368856, 855279368849 855279368852)', 'OR')"),
-      Seq(Row(855279368850L, 116288955, 39999101)))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select mygeohash, longitude, latitude from $table where IN_POLYGON_RANGE_LIST(" +
+            s"'RANGELIST (855279368848 855279368856, 855279368849 855279368852)', 'OR')"),
+        Seq(Row(855279368850L, 116288955, 39999101)))
+    })
   }
 
   test("test two range lists query: union of two range lists which are intersected") {
@@ -782,28 +885,31 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       "is completely in first range list") {
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select mygeohash, longitude, latitude from $table1 where IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
-        s"855282156300 855282157400)', 'OR')"),
-      Seq(Row(855279368850L, 116288955, 39999101),
-        Row(855280799612L, 116285807, 40084087),
-        Row(855282156308L, 116337069, 39951887)))
-    checkAnswer(
-      sql(s"select mygeohash, longitude, latitude from $table1 where IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
-        s"855282156301 855282157000)', 'OR')"),
-      Seq(Row(855279368850L, 116288955, 39999101),
-        Row(855282156308L, 116337069, 39951887)))
-    checkAnswer(
-      sql(s"select mygeohash, longitude, latitude from $table1 where IN_POLYGON_RANGE_LIST(" +
-        s"'RANGELIST (855279368840 855279368852, 855280799610 855280799620, " +
-        s"855282156300 855282157400), " +
-        s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
-        s"855282156301 855282157000)', " +
-        s"'AND')"),
-      Seq(Row(855279368850L, 116288955, 39999101),
-        Row(855282156308L, 116337069, 39951887)))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select mygeohash, longitude, latitude from $table where IN_POLYGON_RANGE_LIST(" +
+            s"'RANGELIST (855279368850 855279368852, 855280799610 855280799612, " +
+            s"855282156300 855282157400)', 'OR')"),
+        Seq(Row(855279368850L, 116288955, 39999101),
+          Row(855280799612L, 116285807, 40084087),
+          Row(855282156308L, 116337069, 39951887)))
+      checkAnswer(
+        sql(s"select mygeohash, longitude, latitude from $table where IN_POLYGON_RANGE_LIST(" +
+            s"'RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+            s"855282156301 855282157000)', 'OR')"),
+        Seq(Row(855279368850L, 116288955, 39999101),
+          Row(855282156308L, 116337069, 39951887)))
+      checkAnswer(
+        sql(s"select mygeohash, longitude, latitude from $table where IN_POLYGON_RANGE_LIST(" +
+            s"'RANGELIST (855279368840 855279368852, 855280799610 855280799620, " +
+            s"855282156300 855282157400), " +
+            s"RANGELIST (855279368848 855279368850, 855280799613 855280799615, " +
+            s"855282156301 855282157000)', " +
+            s"'AND')"),
+        Seq(Row(855279368850L, 116288955, 39999101),
+          Row(855282156308L, 116337069, 39951887)))
+    })
   }
 
   test("test transforming GeoId to GridXY") {
@@ -812,10 +918,13 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       Seq(Row(Seq(613089, 722908))))
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select longitude, latitude, mygeohash, GeoIdToGridXy(mygeohash) as GridXY " +
-        s"from $table1 where mygeohash = 855279270226"),
-      Seq(Row(116302895, 39930753, 855279270226L, Seq(613089, 722908))))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude, mygeohash, GeoIdToGridXy(mygeohash) as GridXY " +
+            s"from $table where mygeohash = 855279270226"),
+        Seq(Row(116302895, 39930753, 855279270226L, Seq(613089, 722908))))
+    })
   }
 
   test("test transforming latitude and longitude to GeoId") {
@@ -824,11 +933,14 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       Seq(Row(855279270226L)))
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select longitude, latitude, mygeohash, " +
-        s"LatLngToGeoId(latitude, longitude, 39.832277, 50) as geoId " +
-        s"from $table1 where mygeohash = 855279270226"),
-      Seq(Row(116302895, 39930753, 855279270226L, 855279270226L)))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude, mygeohash, " +
+            s"LatLngToGeoId(latitude, longitude, 39.832277, 50) as geoId " +
+            s"from $table where mygeohash = 855279270226"),
+        Seq(Row(116302895, 39930753, 855279270226L, 855279270226L)))
+    })
   }
 
   test("test transforming GeoId to latitude and longitude") {
@@ -837,11 +949,14 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       Seq(Row(Seq(39.930529, 116.303093))))
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select longitude, latitude, mygeohash, " +
-        s"GeoIdToLatLng(mygeohash, 39.832277, 50) as LatitudeAndLongitude " +
-        s"from $table1 where mygeohash = 855279270226"),
-      Seq(Row(116302895, 39930753, 855279270226L, Seq(39.930529, 116.303093))))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude, mygeohash, " +
+            s"GeoIdToLatLng(mygeohash, 39.832277, 50) as LatitudeAndLongitude " +
+            s"from $table where mygeohash = 855279270226"),
+        Seq(Row(116302895, 39930753, 855279270226L, Seq(39.930529, 116.303093))))
+    })
   }
 
   test("test transforming to upper layer geoId") {
@@ -850,11 +965,14 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
       Seq(Row(213819817556L)))
     createTable()
     loadData()
-    checkAnswer(
-      sql(s"select longitude, latitude, mygeohash, " +
-        s"ToUpperLayerGeoId(mygeohash) as upperLayerGeoId " +
-        s"from $table1 where mygeohash = 855279270226"),
-      Seq(Row(116302895, 39930753, 855279270226L, 213819817556L)))
+    createTableWithDf(createDf(), dfTable1)
+    List(table1, dfTable1).foreach(table => {
+      checkAnswer(
+        sql(s"select longitude, latitude, mygeohash, " +
+            s"ToUpperLayerGeoId(mygeohash) as upperLayerGeoId " +
+            s"from $table where mygeohash = 855279270226"),
+        Seq(Row(116302895, 39930753, 855279270226L, 213819817556L)))
+    })
   }
 
   test("test transforming polygon string to rangeList") {
@@ -889,6 +1007,8 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
   def drop(): Unit = {
     sql(s"drop table if exists $table1")
     sql(s"drop table if exists $table2")
+    sql(s"drop table if exists $dfTable1")
+    sql(s"drop table if exists $dfTable2")
   }
 
   def createTable(tableName : String = table1, customProperties : String = ""): Unit = {
@@ -917,4 +1037,35 @@ class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
     sql(s"""LOAD DATA local inpath '$resourcesPath/geodata2.csv' INTO TABLE $tableName OPTIONS
            |('DELIMITER'= ',')""".stripMargin)
   }
+
+  def createDf(): DataFrame = {
+    val geoSchema = StructType(Seq(StructField("timevalue", LongType, nullable = true),
+      StructField("longitude", LongType, nullable = false),
+      StructField("latitude", LongType, nullable = false)))
+    sqlContext.read.option("delimeter", ",").option("header", "true").schema(geoSchema)
+      .csv(s"$resourcesPath/geodata.csv")
+  }
+
+  def createDf2(): DataFrame = {
+    val geoSchema = StructType(Seq(StructField("timevalue", LongType, nullable = true),
+      StructField("longitude", LongType, nullable = false),
+      StructField("latitude", LongType, nullable = false)))
+    sqlContext.read.option("delimeter", ",").option("header", "true").schema(geoSchema)
+      .csv(s"$resourcesPath/geodata2.csv")
+  }
+
+  def createTableWithDf(geoDf: DataFrame, tableName: String = dfTable1): Unit = {
+    geoDf.write
+      .format("carbondata")
+      .option("tableName", s"$tableName")
+      .option("SPATIAL_INDEX", "mygeohash")
+      .option("SPATIAL_INDEX.mygeohash.type", "geohash")
+      .option("spatial_index.MyGeoHash.sourcecolumns", "longitude, latitude")
+      .option("SPATIAL_INDEX.MyGeoHash.originLatitude", "39.832277")
+      .option("SPATIAL_INDEX.mygeohash.gridSize", "50")
+      .option("spatial_index.mygeohash.conversionRatio", "1000000")
+      .option("spatial_index.mygeohash.CLASS", "org.apache.carbondata.geo.GeoHashIndex")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
 }