You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2021/05/10 05:19:19 UTC

[carbondata] branch master updated: [CARBONDATA-4166] Geo spatial Query Enhancements

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c825730  [CARBONDATA-4166] Geo spatial Query Enhancements
c825730 is described below

commit c825730bb4306583dcc5d3f183209c241314bfde
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Fri Mar 12 08:19:43 2021 +0530

    [CARBONDATA-4166] Geo spatial Query Enhancements
    
    Why is this PR needed?
    Currently, for IN_POLYGON_LIST and IN_POLYLINE_LIST udf’s, polygons need to be
    specified in SQL. If the polygon list grows in size, then the SQL will also be too long,
    which may affect query performance, as SQL analysing cost will be more.
    If Polygons are defined as a Column in a new dimension table, then, Spatial dimension
    table join can be supported in order to support aggregation on spatial table columns
    based on polygons.
    
    What changes were proposed in this PR?
    Support IN_POLYGON_LIST and IN_POLYLINE_LIST with SELECT QUERY on the
    polygon table.
    Support IN_POLYGON filter as join condition for spatial JOIN queries.
    
    Does this PR introduce any user interface change?
    Yes.
    
    Is any new testcase added?
    Yes
    
    This closes #4127
---
 docs/spatial-index-guide.md                        |  65 +++++
 .../org/apache/carbondata/geo/GeoConstants.java    |   2 +
 .../org/apache/carbondata/geo/GeoHashUtils.java    |  46 +++
 .../geo/scan/expression/PolygonExpression.java     |   3 +-
 .../geo/scan/expression/PolygonListExpression.java |   6 +-
 .../expression/PolygonRangeListExpression.java     |  37 ++-
 .../org/apache/carbondata/geo/GeoUtilUDFs.scala    |  17 ++
 .../org/apache/carbondata/geo/InPolygonUDF.scala   |  17 ++
 .../joins/BroadCastPolygonFilterPushJoin.scala     | 236 +++++++++++++++
 .../spark/sql/execution/strategy/DMLStrategy.scala | 124 +++++++-
 .../apache/spark/sql/optimizer/CarbonFilters.scala |  65 ++++-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |  31 +-
 .../apache/spark/sql/CarbonToSparkAdapter.scala    |  30 +-
 integration/spark/src/test/resources/geodata3.csv  |  35 +++
 .../org/apache/carbondata/geo/GeoQueryTest.scala   | 324 +++++++++++++++++++++
 15 files changed, 1005 insertions(+), 33 deletions(-)

diff --git a/docs/spatial-index-guide.md b/docs/spatial-index-guide.md
index 60bc4ba..db7392c 100644
--- a/docs/spatial-index-guide.md
+++ b/docs/spatial-index-guide.md
@@ -106,18 +106,83 @@ Query with Polygon List UDF predicate
 select * from source_index where IN_POLYGON_LIST('POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)), POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276))', 'OR')
 ```
 
+or
+
+```
+select * from source_index where IN_POLYGON_LIST('select polygon from polyon_table', 'OR')
+```
+
+Polygon table example for above sub-query:
+
+| polygon: String Type | poiId: Int Type |
+|---------------|----------------|
+| POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)) | 1 |
+| POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276)) |  2  |
+
+
 Query with Polyline List UDF predicate
 
 ```
 select * from source_index where IN_POLYLINE_LIST('LINESTRING (116.137676 40.163503, 116.137676 39.935276, 116.260993 39.935276), LINESTRING (116.260993 39.935276, 116.560993 39.935276, 116.560993 40.163503)', 65)
 ```
 
+or
+
+```
+select * from source_index where IN_POLYLINE_LIST('select polyLine from polyon_table', 65)
+```
+
+PolyLine table example for above sub-query:
+
+| polyLine: String Type | poiId: Int Type |
+|---------------|----------------|
+| LINESTRING (116.137676 40.163503, 116.137676 39.935276, 116.260993 39.935276) | 1 |
+| LINESTRING (116.260993 39.935276, 116.560993 39.935276, 116.560993 40.163503) |  2  |
+
 Query with Polygon Range List UDF predicate
 
 ```
 select * from source_index where IN_POLYGON_RANGE_LIST('RANGELIST (855279368848 855279368850, 855280799610 855280799612, 855282156300 855282157400), RANGELIST (855279368852 855279368854, 855280799613 855280799615, 855282156200 855282157500)', 'OR')
 ```
 
+Query having Join on Spatial and Polygon table with Polygon Join UDF predicate
+
+```
+select sum(t1.col1), t2.poiId
+from spatial_table t1
+inner join
+(select polygon, poiId from polygon_table where poiType='abc') t2
+on IN_POLYGON_JOIN(t1.mygeohash, t2.polygon)
+group by t2.poiId
+```
+
+Polygon table example for above query:
+
+| polygon: String Type | poiType: String Type | poiId: Int Type |
+|---------------|----------------|----------------|
+| POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)) | abc | 1 |
+| POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276)) |  def  | 2  |
+| POLYGON ((116.560993 40.935276, 116.360993 40.163503, 116.137676 40.163403, 116.560993 39.935276)) |  abc  | 3  |
+
+Query having Join on Spatial and Polygon table with Polygon Join RangeList UDF predicate
+
+```
+select sum(t1.col1), t2.poiId
+from spatial_table t1
+inner join
+(select polygonRanges, poiId from polygon_table) t2
+on IN_POLYGON_JOIN_RANGE_LIST(t1.mygeohash, t2.polygonRanges)
+group by t2.poiId
+```
+
+Polygon table example for above query:
+
+| polygonRanges: String Type | poiType: String Type | poiId: Int Type |
+|---------------|----------------|----------------|
+| RANGELIST (855279368848 855279368850, 855280799610 855280799612, 855282156300 855282157400) | abc | 1 |
+| rangelist (855279368852 855279368854, 855280799613 855280799615, 855282156200 855282157500) |  def  | 2  |
+| RANGELIST (855279368848 855279368850, 855280799613 855280799617, 855282156300 855282157400) |  abc  | 3  |
+
 Convert spatial index to spatial grid x, y
 
 ```
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
index 0520f1e..67baa82 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoConstants.java
@@ -41,6 +41,8 @@ public class GeoConstants {
 
   public static final String GRID_SIZE = "gridSize";
 
+  public static final String RANGE_LIST = "rangelist";
+
   // delimiter of input points or ranges
   public static final String DEFAULT_DELIMITER = ",";
 
diff --git a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
index 09baf5c..be02be8 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/GeoHashUtils.java
@@ -23,10 +23,12 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression;
 
 import static org.apache.carbondata.geo.GeoConstants.POSITIVE_INTEGER_REGEX;
 
@@ -277,6 +279,21 @@ public class GeoHashUtils {
     return str.trim().split("\\s+");
   }
 
+  public static String getRangeListAsString(List<Long[]> rangeList) {
+    StringBuilder rangeString = null;
+    for (Long[] range : rangeList) {
+      if (rangeString != null) {
+        rangeString.append(",");
+      }
+      if (rangeString == null) {
+        rangeString = new StringBuilder(StringUtils.join(range, " "));
+      } else {
+        rangeString.append(StringUtils.join(range, " "));
+      }
+    }
+    return rangeString.toString();
+  }
+
   public static void validateRangeList(List<Long[]> ranges) {
     for (Long[] range : ranges) {
       if (range.length != 2) {
@@ -432,4 +449,33 @@ public class GeoHashUtils {
     }
     return false;
   }
+
+  /**
+   * Evaluate whether the search value(geoId) is present in the GeoId polygon ranges.
+   */
+  public static boolean performRangeSearch(String polygonRanges, String geoId) {
+    if (null == polygonRanges || polygonRanges.equalsIgnoreCase("null")) {
+      return false;
+    }
+    List<Long[]> ranges = PolygonRangeListExpression.getRangeListFromString(polygonRanges);
+    // check if the geoId is present within the ranges
+    return GeoHashUtils.rangeBinarySearch(ranges, Long.parseLong(geoId));
+  }
+
+  /**
+   * Apply pattern for a input polygon row
+   * @param regex to be applied to a pattern
+   * @param polygonOrRanges could be polygon or GeoId RangeList
+   * @return polygon or GeoId range
+   */
+  public static String getRange(String regex, String polygonOrRanges) {
+    // parser and get the range list
+    Pattern pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
+    Matcher matcher = pattern.matcher(polygonOrRanges);
+    String range = polygonOrRanges;
+    while (matcher.find()) {
+      range = matcher.group();
+    }
+    return range;
+  }
 }
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index 8f4e410..a9e3579 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -118,13 +118,14 @@ public class PolygonExpression extends UnknownExpression implements ConditionalE
     out.writeObject(polygon);
     out.writeObject(instance);
     out.writeObject(column);
+    out.writeObject(ranges);
   }
 
   private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
     polygon = (String) in.readObject();
     instance = (CustomIndex<List<Long[]>>) in.readObject();
     column = (ColumnExpression) in.readObject();
-    ranges = new ArrayList<Long[]>();
+    ranges = (List<Long[]>) in.readObject();
   }
 
   @Override
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
index 3250b38..3e22dd3 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
@@ -28,6 +28,8 @@ import org.apache.carbondata.geo.GeoConstants;
 import org.apache.carbondata.geo.GeoHashUtils;
 import org.apache.carbondata.geo.GeoOperationType;
 
+import org.apache.commons.lang.StringUtils;
+
 /**
  * InPolygonList expression processor. It inputs the InPolygonList string to the Geo
  * implementation's query method, gets a list of range of IDs from each polygon and
@@ -55,7 +57,9 @@ public class PolygonListExpression extends PolygonExpression {
       Matcher matcher = pattern.matcher(polygon);
       while (matcher.find()) {
         String matchedStr = matcher.group();
-        polygons.add(matchedStr);
+        if (!(matchedStr == null || StringUtils.isEmpty(matchedStr))) {
+          polygons.add(matchedStr);
+        }
       }
       if (polygons.size() < 2) {
         throw new RuntimeException("polygon list need at least 2 polygons, really has " +
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
index 704128d..8e3d10a 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
@@ -40,22 +40,41 @@ public class PolygonRangeListExpression extends PolygonExpression {
 
   private String opType;
 
+  /**
+   * If range start's with RANGELIST_REG_EXPRESSION or not
+   */
+  private boolean computeRange = true;
+
+  private List<String> polygonRangeLists = new ArrayList<>();
+
   public PolygonRangeListExpression(String polygonRangeList, String opType, String columnName,
       CustomIndex indexInstance) {
     super(polygonRangeList, columnName, indexInstance);
     this.opType = opType;
   }
 
+  public PolygonRangeListExpression(String polygonRangeList, String opType, String columnName,
+      CustomIndex indexInstance, boolean computeRange, List<String> polygonRangeLists) {
+    super(polygonRangeList, columnName, indexInstance);
+    this.opType = opType;
+    this.computeRange = computeRange;
+    this.polygonRangeLists = polygonRangeLists;
+  }
+
   @Override
   public void processExpression() {
     // 1. parse the range list string
     List<String> rangeLists = new ArrayList<>();
-    Pattern pattern =
-        Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION, Pattern.CASE_INSENSITIVE);
-    Matcher matcher = pattern.matcher(polygon);
-    while (matcher.find()) {
-      String matchedStr = matcher.group();
-      rangeLists.add(matchedStr);
+    if (computeRange) {
+      Pattern pattern =
+          Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION, Pattern.CASE_INSENSITIVE);
+      Matcher matcher = pattern.matcher(polygon);
+      while (matcher.find()) {
+        String matchedStr = matcher.group();
+        rangeLists.add(matchedStr);
+      }
+    } else {
+      rangeLists.addAll(polygonRangeLists);
     }
     // 2. process the range lists
     if (rangeLists.size() > 0) {
@@ -73,7 +92,7 @@ public class PolygonRangeListExpression extends PolygonExpression {
     }
   }
 
-  private void sortRange(List<Long[]> rangeList) {
+  public static void sortRange(List<Long[]> rangeList) {
     rangeList.sort(new Comparator<Long[]>() {
       @Override
       public int compare(Long[] x, Long[] y) {
@@ -82,7 +101,7 @@ public class PolygonRangeListExpression extends PolygonExpression {
     });
   }
 
-  private void combineRange(List<Long[]> rangeList) {
+  public static void combineRange(List<Long[]> rangeList) {
     for (int i = 0, j = i + 1; i < rangeList.size() - 1; i++, j++) {
       long previousEnd = rangeList.get(i)[1];
       long nextStart = rangeList.get(j)[0];
@@ -97,7 +116,7 @@ public class PolygonRangeListExpression extends PolygonExpression {
     rangeList.removeIf(item -> item[0] == null && item[1] == null);
   }
 
-  private List<Long[]> getRangeListFromString(String rangeListString) {
+  public static List<Long[]> getRangeListFromString(String rangeListString) {
     String[] rangeStringList = rangeListString.trim().split(GeoConstants.DEFAULT_DELIMITER);
     List<Long[]> rangeList = new ArrayList<>();
     for (String rangeString : rangeStringList) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
index e25768a..cc25c14 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
@@ -30,6 +30,7 @@ object GeoUtilUDFs {
     sparkSession.udf.register("LatLngToGeoId", new LatLngToGeoIdUDF)
     sparkSession.udf.register("ToUpperLayerGeoId", new ToUpperLayerGeoIdUDF)
     sparkSession.udf.register("ToRangeList", new ToRangeListUDF)
+    sparkSession.udf.register("ToRangeListAsString", new ToRangeListAsStringUDF)
   }
 }
 
@@ -69,3 +70,19 @@ class ToRangeListUDF extends ((java.lang.String, java.lang.Double, java.lang.Int
     GeoHashUtils.getRangeList(polygon, oriLatitude, gridSize).asScala.map(_.map(Long2long))
   }
 }
+
+class ToRangeListAsStringUDF
+  extends ((java.lang.String, java.lang.Double, java.lang.Integer) => String) with Serializable {
+  override def apply(polygon: java.lang.String, oriLatitude: java.lang.Double,
+      gridSize: java.lang.Integer): String = {
+    // parse and get the polygon
+    val range: String = GeoHashUtils.getRange(GeoConstants.POLYGON_REG_EXPRESSION, polygon)
+    if (range == null || range.equalsIgnoreCase("null")) {
+      return null
+    }
+    // get geoID range list for the input polygon
+    val buffer = GeoHashUtils.getRangeList(range, oriLatitude, gridSize)
+    // convert to string
+    GeoHashUtils.getRangeListAsString(buffer)
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
index 8bed2fb..5dcc882 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
@@ -28,6 +28,8 @@ object GeoFilterUDFs {
     sparkSession.udf.register("in_polygon_list", new InPolygonListUDF)
     sparkSession.udf.register("in_polyline_list", new InPolylineListUDF)
     sparkSession.udf.register("in_polygon_range_list", new InPolygonRangeListUDF)
+    sparkSession.udf.register("in_polygon_join", new InPolygonJoinUDF)
+    sparkSession.udf.register("in_polygon_join_range_list", new InPolygonJoinRangeListUDF)
   }
 }
 
@@ -39,6 +41,21 @@ class InPolygonUDF extends (String => Boolean) with Serializable {
 }
 
 @InterfaceAudience.Internal
+class InPolygonJoinRangeListUDF extends ((String, String) => Boolean) with Serializable {
+  override def apply(geoId: String, polygonRanges: String): Boolean = {
+    GeoHashUtils.performRangeSearch(GeoHashUtils.getRange(GeoConstants.RANGELIST_REG_EXPRESSION,
+      polygonRanges), geoId)
+  }
+}
+
+@InterfaceAudience.Internal
+class InPolygonJoinUDF extends ((String, String) => Boolean) with Serializable {
+  override def apply(geoId: String, polygonRanges: String): Boolean = {
+    GeoHashUtils.performRangeSearch(polygonRanges, geoId)
+  }
+}
+
+@InterfaceAudience.Internal
 class InPolygonListUDF extends ((String, String) => Boolean) with Serializable {
   override def apply(v1: String, v2: String): Boolean = {
     true // Carbon applies the filter. So, Spark do not have to apply filter.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
new file mode 100644
index 0000000..cab5a60
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(
+    leftKeys: Seq[Expression],
+    rightKeys: Seq[Expression],
+    joinType: JoinType,
+    buildSide: BuildSide,
+    condition: Option[Expression],
+    left: SparkPlan,
+    right: SparkPlan
+) extends BinaryExecNode with HashJoin {
+
+  override protected lazy val (buildPlan, streamedPlan) = buildSide match {
+    case BuildLeft => (left, right)
+    case BuildRight => (right, left)
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  // execute the polygon table sparkPlan and collect data rows
+  private lazy val inputCopy: Array[InternalRow] = {
+    getBuildPlan.map(_.copy()).collect().clone()
+  }
+
+  @transient private lazy val boundCondition: InternalRow => Boolean = {
+    // get the join condition
+    if (condition.isDefined) {
+      newPredicate(condition.get, streamedPlan.output ++ buildPlan.output).eval _
+    } else {
+      (_: InternalRow) => true
+    }
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    // get polygon rangeList from polygon table and add as IN_POLYGON_RANGE_LIST filter to the
+    // spatial table
+    addPolygonRangeListFilterToPlan(buildPlan, streamedPlan, inputCopy, condition)
+    // inner join spatial and polygon plan by applying in_polygon join filter
+    streamedPlan.execute().mapPartitionsInternal {
+      streamedIter =>
+        // get polygon table data rows
+        val buildRows = inputCopy
+        val joinedRow = new JoinedRow
+
+        streamedIter.flatMap { streamedRow =>
+          val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
+          // apply in_polygon_join filter
+          if (condition.isDefined) {
+            joinedRows.filter(boundCondition)
+          } else {
+            joinedRows
+          }
+        }
+    }
+  }
+
+  protected def getBuildPlan: RDD[InternalRow] = {
+    // buildPlan will be the polygon table plan. Match the buildPlan and return the result of
+    // this query as an RDD[InternalRow]
+    buildPlan match {
+      case c@CarbonBroadCastExchangeExec(_, _) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case _ => buildPlan.children.head match {
+        case c@CarbonBroadCastExchangeExec(_, _) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case _ => buildPlan.execute()
+      }
+    }
+  }
+}
+
+object BroadCastPolygonFilterPushJoin {
+
+  /**
+   * This method will add the polygon range list filter to the spatial table scan
+   * @param buildPlan polygon table spark plan
+   * @param streamedPlan spatial table spark plan
+   * @param inputCopy polygon table data rows
+   * @param condition in_polygon_join expression
+   */
+  def addPolygonRangeListFilterToPlan(buildPlan: SparkPlan,
+      streamedPlan: SparkPlan,
+      inputCopy: Array[InternalRow],
+      condition: Option[Expression]): Unit = {
+
+    // get the polygon column from the in_polygon_join join condition
+    val children = condition.get.asInstanceOf[ScalaUDF].children
+    val polygonExpression = children(1)
+
+    // evaluate and get the polygon data rows from polygon table InternalRows
+    val keys = polygonExpression.map { a =>
+      BindReferences.bindReference(a, buildPlan.output)
+    }.toArray
+
+    val filters = keys.map {
+      k =>
+        inputCopy.map(
+          r => {
+            val curr = k.eval(r)
+            curr match {
+              case _: UTF8String => Literal(curr.toString).asInstanceOf[Expression]
+              case _ => Literal(curr).asInstanceOf[Expression]
+            }
+          })
+    }
+
+    // get the spatial table scan
+    val tableScan = streamedPlan.collectFirst {
+      case ProjectExec(_, batchData: CarbonDataSourceScan) =>
+        batchData
+      case ProjectExec(_, rowData: RowDataSourceScanExec) =>
+        rowData
+      case batchData: CarbonDataSourceScan =>
+        batchData
+      case rowData: RowDataSourceScanExec =>
+        rowData
+    }
+    val configuredFilterRecordSize = CarbonProperties.getInstance.getProperty(
+      CarbonCommonConstants.BROADCAST_RECORD_SIZE,
+      CarbonCommonConstants.DEFAULT_BROADCAST_RECORD_SIZE)
+
+    // add filter to spatial table scan
+    if (tableScan.isDefined && null != filters
+        && filters.length > 0
+        && (filters(0).length > 0 && filters(0).length <= configuredFilterRecordSize.toInt)) {
+      tableScan.get match {
+        case scan: CarbonDataSourceScan =>
+          addPushDownToCarbonRDD(scan.inputRDDs().head, filters(0), scan.relation.carbonTable)
+        case _ =>
+      }
+    }
+  }
+
+  private def addPushDownToCarbonRDD(rdd: RDD[InternalRow],
+      polygonFilter: Array[Expression], table: CarbonTable): Unit = {
+    rdd match {
+      case value: CarbonScanRDD[InternalRow] =>
+        // prepare Polygon Range List filter
+        if (polygonFilter.nonEmpty) {
+          // get the GeoHandler custom instance
+          val (columnName, instance) = GeoUtils.getGeoHashHandler(table
+            .getTableInfo.getFactTable.getTableProperties.asScala)
+          var inputPolygonRanges = new ArrayBuffer[String]
+          // remove NULL values in the polygon range list and convert list of ranges to string
+          polygonFilter.map { expression =>
+            val range: String = expression.asInstanceOf[Literal].value.toString
+            inputPolygonRanges += range
+            expression
+          }
+          inputPolygonRanges = inputPolygonRanges.filterNot(range =>
+            range.equalsIgnoreCase("NULL") ||
+            range.equalsIgnoreCase("'null'"))
+          val polygonRanges = inputPolygonRanges.mkString("\\,")
+          // get the PolygonRangeListExpression for input polygon range
+          val expressionVal = if (polygonRanges.toLowerCase.startsWith(GeoConstants.RANGE_LIST)) {
+            new PolygonRangeListExpression(polygonRanges,
+              "OR",
+              columnName,
+              instance)
+          } else {
+            new PolygonRangeListExpression(polygonRanges,
+              "OR",
+              columnName,
+              instance,
+              false,
+              inputPolygonRanges.asJava)
+          }
+          // set the filter as PolygonRangeListExpression
+          if (null != expressionVal) {
+            val filter = new IndexFilter(table, expressionVal)
+            value.indexFilter = filter
+          }
+        }
+      case _ =>
+    }
+  }
+}
+
+object CarbonBroadCastExchangeExec {
+  def unapply(plan: SparkPlan): Option[(BroadcastMode, SparkPlan)] = {
+    // if plan contains BroadcastExchange, get the sparkPlan
+    plan match {
+      case cExe: BroadcastExchangeExec =>
+        Some(cExe.mode, cExe.child)
+      case _ => None
+    }
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
index e2e41dc..33c52e5 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
@@ -19,25 +19,34 @@ package org.apache.spark.sql.execution.strategy
 
 import java.util.Locale
 
+import scala.collection.mutable
+
 import org.apache.log4j.Logger
-import org.apache.spark.sql.{CarbonCountStar, CarbonDatasourceHadoopRelation, CountStarPlan, InsertIntoCarbonTable, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, AttributeReference, Descending, Expression, IntegerLiteral, NamedExpression, SortOrder}
+import org.apache.spark.sql.{CarbonCountStar, CarbonDatasourceHadoopRelation, CarbonToSparkAdapter, CountStarPlan, InsertIntoCarbonTable, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
+import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, AttributeReference, Cast, Descending, Expression, IntegerLiteral, Literal, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, PhysicalOperation}
-import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi}
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, Limit, LogicalPlan, Project, ReturnAnswer, Sort}
-import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec, FilterExec, ProjectExec, SparkPlan, SparkStrategy}
+import org.apache.spark.sql.catalyst.plans.{Inner, JoinType, LeftSemi}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, Limit, LogicalPlan, Project, ReturnAnswer, Sort}
+import org.apache.spark.sql.execution.{CarbonTakeOrderedAndProjectExec, FilterExec, PlanLater, ProjectExec, SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command.{DataWritingCommandExec, ExecutedCommandExec, LoadDataCommand}
 import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
-import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight}
+import org.apache.spark.sql.execution.joins.{BroadCastPolygonFilterPushJoin, BuildLeft, BuildRight}
 import org.apache.spark.sql.execution.strategy.CarbonPlanHelper.isCarbonTable
 import org.apache.spark.sql.hive.MatchLogicalRelation
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, StringType}
+import org.apache.spark.sql.util.SparkSQLUtil
+import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{InPolygonJoinRangeListUDF, InPolygonJoinUDF, ToRangeListAsStringUDF}
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 object DMLStrategy extends SparkStrategy {
   val LOGGER: Logger = LogServiceFactory.getLogService(this.getClass.getName)
@@ -56,6 +65,104 @@ object DMLStrategy extends SparkStrategy {
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) =>
         val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
         CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil
+      case Join(left, right, joinType, condition)
+        if condition.isDefined && condition.get.isInstanceOf[ScalaUDF] &&
+           isPolygonJoinUdfFilter(condition) =>
+        if (joinType != Inner) {
+          throw new UnsupportedOperationException("Unsupported query")
+        }
+        val carbon = CarbonSourceStrategy.apply(left).head
+        val leftKeys = Seq(condition.get.asInstanceOf[ScalaUDF].children.head)
+        val rightKeys = Seq(condition.get.asInstanceOf[ScalaUDF].children.last)
+        if (condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinUDF]) {
+          // If join condition is IN_POLYGON_JOIN udf, then add a implicit projection to the
+          // polygon table logical plan
+          val tableInfo = carbon.collectFirst {
+            case scan: CarbonDataSourceScan => scan.inputRDDs().head
+          }.get.asInstanceOf[CarbonScanRDD[InternalRow]].getTableInfo
+          // create ToRangeListAsString udf as implicit projection with the required fields
+          val toRangeListUDF = new ToRangeListAsStringUDF
+          val dataType = StringType
+          var children: Seq[Expression] = mutable.Seq.empty
+          val geoHashColumn = condition.get.children.head match {
+            case Cast(attr: AttributeReference, _, _) =>
+              attr
+            case attr: AttributeReference =>
+              attr
+          }
+          // get origin Latitude and gridSize from spatial table properties
+          val commonKey = CarbonCommonConstants.SPATIAL_INDEX +
+                          CarbonCommonConstants.POINT +
+                          geoHashColumn.name +
+                          CarbonCommonConstants.POINT
+          val originLatitude = tableInfo.getFactTable
+            .getTableProperties
+            .get(commonKey + "originlatitude")
+          val gridSize = tableInfo.getFactTable.getTableProperties.get(commonKey + "gridsize")
+          if (originLatitude == null || gridSize == null) {
+            throw new UnsupportedOperationException(
+              s"Join condition having left column ${ geoHashColumn.name } is not GeoId column")
+          }
+          // join condition right side will be the polygon column
+          children = children :+ condition.get.children.last
+          children = children :+ Literal(originLatitude.toDouble)
+          children = children :+ Literal(gridSize.toInt)
+
+          var inputTypes: Seq[DataType] = Seq.empty
+          inputTypes = inputTypes :+ StringType
+          inputTypes = inputTypes :+ DoubleType
+          inputTypes = inputTypes :+ IntegerType
+          val rangeListScalaUdf = CarbonToSparkAdapter.createRangeListScalaUDF(toRangeListUDF,
+            dataType, children, inputTypes)
+          // add ToRangeListAsString udf column to the polygon table plan projection list
+          val rightSide = right transform {
+            case Project(projectList, child) =>
+              val positionId = UnresolvedAlias(rangeListScalaUdf)
+              val newProjectList = projectList :+ positionId
+              Project(newProjectList, child)
+          }
+          val sparkSession = SparkSQLUtil.getSparkSession
+          lazy val analyzer = sparkSession.sessionState.analyzer
+          lazy val optimizer = sparkSession.sessionState.optimizer
+          val analyzedPlan = CarbonReflectionUtils.invokeAnalyzerExecute(
+            analyzer, rightSide)
+          val polygonTablePlan = optimizer.execute(analyzedPlan)
+          // transform join condition by replacing polygon column with ToRangeListAsString udf
+          // column output
+          val newCondition = condition.get transform {
+            case scalaUdf: ScalaUDF if scalaUdf.function.isInstanceOf[InPolygonJoinUDF] =>
+              var udfChildren: Seq[Expression] = Seq.empty
+              udfChildren = udfChildren :+ scalaUdf.children.head
+              udfChildren = udfChildren :+ polygonTablePlan.output.last
+              val types = scalaUdf.inputTypes :+ scalaUdf.inputTypes.head
+              val polygonJoinUdf = new InPolygonJoinUDF
+              CarbonToSparkAdapter.getTransformedPolygonJoinUdf(scalaUdf,
+                udfChildren, types, polygonJoinUdf)
+          }
+          // push down in_polygon join filter to carbon
+          val pushedDownJoin = BroadCastPolygonFilterPushJoin(
+            leftKeys,
+            rightKeys,
+            joinType,
+            BuildRight,
+            Some(newCondition),
+            carbon,
+            PlanLater(polygonTablePlan)
+          )
+          Some(newCondition).map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
+        } else {
+          // push down in_polygon join filter to carbon
+          val pushedDownJoin = BroadCastPolygonFilterPushJoin(
+            leftKeys,
+            rightKeys,
+            joinType,
+            BuildRight,
+            condition,
+            carbon,
+            PlanLater(right)
+          )
+          condition.map(FilterExec(_, pushedDownJoin)).getOrElse(pushedDownJoin) :: Nil
+        }
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition,
       left, right)
         if isCarbonPlan(left) && CarbonIndexUtil.checkIsIndexTable(right) =>
@@ -128,6 +235,11 @@ object DMLStrategy extends SparkStrategy {
     }
   }
 
+  private def isPolygonJoinUdfFilter(condition: Option[Expression]) = {
+    condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinUDF] ||
+    condition.get.asInstanceOf[ScalaUDF].function.isInstanceOf[InPolygonJoinRangeListUDF]
+  }
+
   /**
    * Return true if driver-side count star optimization can be used.
    * Following case can't use driver-side count star:
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 3e30a97..796c91e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -22,16 +22,22 @@ import java.util.ArrayList
 import scala.collection.JavaConverters._
 import scala.util.Try
 
-import org.apache.spark.sql.{CarbonBoundReference, CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession, SparkUnknownExpression}
+import org.apache.spark.sql.{CarbonBoundReference, CarbonDatasourceHadoopRelation, CarbonEnv,
+  Dataset, SparkSession, SparkUnknownExpression}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonSparkDataSourceUtil
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
-import org.apache.spark.sql.catalyst.expressions.{And, ArrayContains, Attribute, AttributeReference, Cast, Contains, EmptyRow, EndsWith, EqualTo, GreaterThan, GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or, ScalaUDF, StartsWith, StringTrim}
+import org.apache.spark.sql.catalyst.expressions.{And, ArrayContains, Attribute,
+  AttributeReference, Cast, Contains, EmptyRow, EndsWith, EqualTo, GreaterThan,
+  GreaterThanOrEqual, In, InSet, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not, Or,
+  ScalaUDF, StartsWith, StringTrim}
 import org.apache.spark.sql.catalyst.expressions.{Expression => SparkExpression}
 import org.apache.spark.sql.execution.CastExpressionOptimization
 import org.apache.spark.sql.hive.{CarbonHiveIndexMetadataUtil, CarbonSessionCatalogUtil}
-import org.apache.spark.sql.types.{ArrayType, BooleanType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{ArrayType, BooleanType, DecimalType, DoubleType, FloatType,
+  IntegerType, LongType, MapType, StringType, StructType, TimestampType}
 import org.apache.spark.sql.types.{DataType => SparkDataType}
+import org.apache.spark.sql.util.SparkSQLUtil
 import org.apache.spark.util.{CarbonReflectionUtils, SparkUtil}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
@@ -39,13 +45,20 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression, LiteralExpression, MatchExpression}
-import org.apache.carbondata.core.scan.expression.conditional.{EqualToExpression, GreaterThanEqualToExpression, GreaterThanExpression, ImplicitExpression, InExpression, LessThanEqualToExpression, LessThanExpression, ListExpression, NotEqualsExpression, NotInExpression, StartsWithExpression}
-import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
+import org.apache.carbondata.core.scan.expression.{ColumnExpression, Expression,
+  LiteralExpression, MatchExpression}
+import org.apache.carbondata.core.scan.expression.conditional.{EqualToExpression,
+  GreaterThanEqualToExpression, GreaterThanExpression, ImplicitExpression, InExpression,
+  LessThanEqualToExpression, LessThanExpression, ListExpression, NotEqualsExpression,
+  NotInExpression, StartsWithExpression}
+import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression,
+  OrExpression}
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.geo.{GeoUtils, InPolygonListUDF, InPolygonRangeListUDF, InPolygonUDF, InPolylineListUDF}
-import org.apache.carbondata.geo.scan.expression.{PolygonExpression, PolygonListExpression, PolygonRangeListExpression, PolylineListExpression}
+import org.apache.carbondata.geo.{GeoUtils, InPolygonListUDF, InPolygonRangeListUDF,
+  InPolygonUDF, InPolylineListUDF}
+import org.apache.carbondata.geo.scan.expression.{PolygonExpression, PolygonListExpression,
+  PolygonRangeListExpression, PolylineListExpression}
 import org.apache.carbondata.index.{TextMatchMaxDocUDF, TextMatchUDF}
 
 /**
@@ -235,13 +248,27 @@ object CarbonFilters {
         if (children.size != 2) {
           throw new MalformedCarbonCommandException("Expect two string in polygon list")
         }
+        var polyGonStr = children.head.toString()
+        // check if the expression is polygon list or select query
+        val isPolyGonQuery = polyGonStr.toLowerCase().startsWith("select")
+        if (isPolyGonQuery) {
+          // collect the polygon list by executing query
+          polyGonStr = getPolygonOrPolyLine(polyGonStr)
+        }
         val (columnName, instance) = getGeoHashHandler(relation.carbonTable)
-        Some(new PolygonListExpression(children.head.toString(), children.last.toString(),
+        Some(new PolygonListExpression(polyGonStr, children.last.toString(),
           columnName, instance))
       case _: InPolylineListUDF =>
         if (children.size != 2) {
           throw new MalformedCarbonCommandException("Expect two string in polyline list")
         }
+        var polyLineStr = children.head.toString()
+        // check if the expression is PolyLine list or select query
+        val isPolyGonQuery = polyLineStr.toLowerCase().startsWith("select")
+        if (isPolyGonQuery) {
+          // collect the polyline linestring by executing query
+          polyLineStr = getPolygonOrPolyLine(polyLineStr, isLineString = true)
+        }
         val (columnName, instance) = getGeoHashHandler(relation.carbonTable)
         if (scala.util.Try(children.last.toString().toFloat).isFailure) {
           throw new MalformedCarbonCommandException("Expect buffer size to be of float type")
@@ -250,7 +277,7 @@ object CarbonFilters {
         if (bufferSize <= 0) {
           throw new MalformedCarbonCommandException("Expect buffer size to be a positive value")
         }
-        Some(new PolylineListExpression(children.head.toString(),
+        Some(new PolylineListExpression(polyLineStr,
           children.last.toString().toFloat, columnName, instance))
       case _: InPolygonRangeListUDF =>
         if (children.size != 2) {
@@ -268,6 +295,24 @@ object CarbonFilters {
     GeoUtils.getGeoHashHandler(tableProperties)
   }
 
+  def getPolygonOrPolyLine(query: String, isLineString: Boolean = false): String = {
+    val dataFrame = SparkSQLUtil.getSparkSession.sql(query)
+    // check if more than one column present in query projection
+    if (dataFrame.columns.length != 1) {
+      val udf = if (isLineString) {
+        "PolyLine"
+      } else {
+        "Polygon"
+      }
+      throw new UnsupportedOperationException(
+        s"More than one column exists in the query for $udf List Udf")
+    }
+    dataFrame.collect()
+      .mkString(",")
+      .replace("[", "")
+      .replace("]", "")
+  }
+
   def translateOr(
       left : SparkExpression,
       right: SparkExpression,
diff --git a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 2eced66..0984f94 100644
--- a/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ b/integration/spark/src/main/spark2.3/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -23,9 +23,8 @@ import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, NamedExpression, ScalaUDF, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen.ExprCode
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, ExprId, Expression, ExpressionSet, NamedExpression, ScalaUDF, SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -33,9 +32,9 @@ import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
 import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
-import org.apache.spark.sql.types.{DataType, Metadata}
+import org.apache.spark.sql.types.{DataType, Metadata, StringType}
 
-import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
 
 object CarbonToSparkAdapter {
 
@@ -92,6 +91,30 @@ object CarbonToSparkAdapter {
     ScalaUDF(s.function, s.dataType, Seq(reference), s.inputTypes)
   }
 
+  def createRangeListScalaUDF(toRangeListUDF: ToRangeListAsStringUDF,
+      dataType: StringType.type,
+      children: Seq[Expression],
+      inputTypes: Seq[DataType]): ScalaUDF = {
+    ScalaUDF(toRangeListUDF,
+      dataType,
+      children,
+      inputTypes,
+      Some("ToRangeListAsString"))
+  }
+
+  def getTransformedPolygonJoinUdf(scalaUdf: ScalaUDF,
+      udfChildren: Seq[Expression],
+      types: Seq[DataType],
+      polygonJoinUdf: InPolygonJoinUDF): ScalaUDF = {
+    ScalaUDF(polygonJoinUdf,
+      scalaUdf.dataType,
+      udfChildren,
+      types,
+      scalaUdf.udfName,
+      scalaUdf.nullable,
+      scalaUdf.udfDeterministic)
+  }
+
   def createExprCode(code: String, isNull: String, value: String, dataType: DataType = null
   ): ExprCode = {
     ExprCode(code, isNull, value)
diff --git a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
index 163d833..8b4b5fd 100644
--- a/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
+++ b/integration/spark/src/main/spark2.4/org/apache/spark/sql/CarbonToSparkAdapter.scala
@@ -23,7 +23,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql.carbondata.execution.datasources.CarbonFileIndexReplaceRule
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, ExternalCatalogWithListener, SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExpressionSet, ExprId, NamedExpression, ScalaUDF, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, ExprId, NamedExpression, ScalaUDF, SubqueryExpression}
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
@@ -33,9 +33,10 @@ import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.hive.HiveExternalCatalog
 import org.apache.spark.sql.optimizer.{CarbonIUDRule, CarbonUDFTransformRule, MVRewriteRule}
 import org.apache.spark.sql.secondaryindex.optimizer.CarbonSITransformationRule
-import org.apache.spark.sql.types.{DataType, Metadata}
+import org.apache.spark.sql.types.{DataType, Metadata, StringType}
 
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo
+import org.apache.carbondata.geo.{InPolygonJoinUDF, ToRangeListAsStringUDF}
 
 object CarbonToSparkAdapter {
 
@@ -109,6 +110,31 @@ object CarbonToSparkAdapter {
     ScalaUDF(s.function, s.dataType, Seq(reference), s.inputsNullSafe, s.inputTypes)
   }
 
+  def createRangeListScalaUDF(toRangeListUDF: ToRangeListAsStringUDF,
+      dataType: StringType.type,
+      children: Seq[Expression],
+      inputTypes: Seq[DataType]): ScalaUDF = {
+    val inputsNullSafe: Seq[Boolean] = Seq(true, true, true)
+    ScalaUDF(toRangeListUDF,
+      dataType,
+      children,
+      inputsNullSafe,
+      inputTypes,
+      Some("ToRangeListAsString"))
+  }
+
+  def getTransformedPolygonJoinUdf(scalaUdf: ScalaUDF,
+      udfChildren: Seq[Expression],
+      types: Seq[DataType],
+      polygonJoinUdf: InPolygonJoinUDF): ScalaUDF = {
+    ScalaUDF(polygonJoinUdf,
+      scalaUdf.dataType,
+      udfChildren,
+      scalaUdf.inputsNullSafe,
+      types,
+      scalaUdf.udfName)
+  }
+
   def createExprCode(code: String, isNull: String, value: String, dataType: DataType): ExprCode = {
     ExprCode(
       code"$code",
diff --git a/integration/spark/src/test/resources/geodata3.csv b/integration/spark/src/test/resources/geodata3.csv
new file mode 100644
index 0000000..a14da35
--- /dev/null
+++ b/integration/spark/src/test/resources/geodata3.csv
@@ -0,0 +1,35 @@
+col1,col2,longitude,latitude
+1,12,120177080,30326882
+2,19,120180685,30326327
+3,1,120184976,30327105
+4,12,120189311,30327549
+5,1,120194460,30329698
+6,6,120186965,30329133
+7,8,120177481,30328911
+8,9,120169713,30325614
+9,10,120164563,30322243
+10,11,120171558,30319613
+11,1,120176365,30320687
+12,2,120179669,30323688
+13,34,120181001,30320761
+2,3,120187094,30323540
+5,4,120193574,30323651
+8,6,120186192,30320132
+13,7,120190055,30317464
+6,8,120195376,30318094
+12,10,120160786,30317094
+15,11,120168211,30318057
+1,12,120173618,30316612
+12,1,120181001,30317316
+6,14,120185162,30315908
+8,15,120192415,30315871
+9,16,120161902,30325614
+10,1,120164306,30328096
+12,2,120197093,30325985
+4,4,120196020,30321651
+6,5,120198638,30323540
+7,7,120165421,30314834
+2,5,116285807,40084087
+1,3,116337069,39951887
+1,2,116288955,39999101
+5,6,116325378,39963129
\ No newline at end of file
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
new file mode 100644
index 0000000..ddfbdf7
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/geo/GeoQueryTest.scala
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class GeoQueryTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
+
+  val geoTable = "geoTable"
+  val polygonTable = "polygonTable"
+  val polylineTable = "polylineTable"
+
+  override def beforeAll(): Unit = {
+    drop()
+  }
+
+  test("test polygon list udf with select query as input") {
+    createTable()
+    loadData()
+    createPolygonTable
+    sql(s"insert into $polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245," +
+        s"120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))" +
+        s"','abc','1'")
+    sql(s"insert into $polygonTable select 'polygon ((120.191603 30.328946,120.184179 30.327465," +
+        s"120.181819 30.321464,120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))'," +
+        s"'abc','1'")
+    sql(s"insert into $polygonTable select null,'abc','1'")
+    sql(s"insert into $polygonTable select '','abc','1'")
+    checkAnswer(sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
+                    s"'select polygon from $polygonTable','OR')"),
+      Seq(Row(120177080, 30326882), Row(120180685, 30326327), Row(120184976, 30327105),
+        Row(120176365, 30320687), Row(120179669, 30323688), Row(120181001, 30320761),
+        Row(120187094, 30323540), Row(120186192, 30320132), Row(120181001, 30317316),
+        Row(120189311, 30327549), Row(120193574, 30323651), Row(120190055, 30317464),
+        Row(120196020, 30321651)))
+  }
+
+  test("test polygon list udf with select query and and invalid input") {
+    createTable()
+    loadData()
+    createPolygonTable
+    // verify empty data on polygon table
+    intercept[RuntimeException] {
+      sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
+          s"'select polygon from $polygonTable','OR')").collect()
+    }.getMessage.contains("polygon list need at least 2 polygons, really has 0")
+    sql(s"insert into $polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245," +
+        s"120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))" +
+        s"','abc','1'")
+    intercept[RuntimeException] {
+      sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
+          s"'select polygon from $polygonTable','OR')").collect()
+    }.getMessage.contains("polygon list need at least 2 polygons, really has 1")
+    sql(s"insert into $polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245," +
+        s"120.181411 30.314540, 120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))" +
+        s"','abc','1'")
+    intercept[UnsupportedOperationException] {
+      sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
+          s"'select polygon,poiId from $polygonTable','OR')").collect()
+    }.getMessage.contains("More than one column exists in the query for Polygon List Udf")
+    intercept[RuntimeException] {
+      sql(s"select longitude, latitude from $geoTable where IN_POLYGON_LIST(" +
+          s"'select poiId from $polygonTable','OR')").collect()
+    }.getMessage.contains("polygon list need at least 2 polygons, really has 0")
+  }
+
+  test("test polygon line udf with select query as input") {
+    createTable()
+    loadData()
+    sql(s"""
+         | CREATE TABLE polyLineTable(
+         | polyline string,
+         | poiType string,
+         | poiId String)
+         | STORED AS carbondata
+            """.stripMargin)
+    sql(s"insert into $polylineTable select 'linestring (120.184179 30.327465, 120.191603 " +
+        s"30.328946, 120.199242 30.324464)','abc','1'")
+    sql(s"insert into $polylineTable select 'linestring (120.199242 30.324464, 120.190359 " +
+        s"30.315388)','abc','1'")
+    sql(s"insert into $polylineTable select null,'abc','1'")
+    checkAnswer(
+      sql(s"select longitude, latitude from $geoTable where IN_POLYLINE_LIST(" +
+          s"'select polyline from $polylineTable', 65)"),
+      Seq(Row(120184976, 30327105),
+        Row(120197093, 30325985),
+        Row(120196020, 30321651),
+        Row(120198638, 30323540)))
+  }
+
+  test("test polygon line udf with select query and invalid input") {
+    createTable()
+    loadData()
+    sql(s"""
+         | CREATE TABLE polyLineTable(
+         | polyline string,
+         | poiType string,
+         | poiId String)
+         | STORED AS carbondata
+            """.stripMargin)
+    sql(s"insert into $polylineTable select 'linestring (120.184179 30.327465, 120.191603 " +
+        s"30.328946, 120.199242 30.324464)','abc','1'")
+    intercept[UnsupportedOperationException] {
+      sql(s"select longitude, latitude from $geoTable where IN_POLYLINE_LIST(" +
+          s"'select polyline,poiId from $polylineTable', 65)").collect()
+    }.getMessage.contains("More than one column exists in the query for PolyLine List Udf")
+  }
+
+  test("test join on spatial and polygon table with in_polygon_join udf") {
+    createTable()
+    loadData()
+    createPolygonTable
+    loadPolygonData
+    val df = sql(s"select sum(t1.col1),sum(t1.col2),t2.poiId " +
+                 s"from $geoTable t1 " +
+                 s"inner join " +
+                 s"(select polygon,poiId from $polygonTable where poitype='abc') t2 " +
+                 s"on in_polygon_join(t1.mygeohash,t2.polygon) group by t2.poiId")
+    checkAnswer(df, Seq(Row(64, 79, "1"), Row(39, 37, "2")))
+  }
+
+  test("test block pruning with polygon join query") {
+    createTable()
+    sql(s"insert into $geoTable select 855280799612,1,2,116285807,40084087")
+    sql(s"insert into $geoTable select 855283635086,1,2,116372142,40129503")
+    sql(s"insert into $geoTable select 855279346102,1,2,116187332,39979316")
+    sql(s"insert into $geoTable select 855282156308,1,2,116337069,39951887")
+    sql(s"insert into $geoTable select 855283640154,1,2,116359102,40154684")
+    sql(s"insert into $geoTable select 855282440834,1,2,116736367,39970323")
+    sql(s"insert into $geoTable select 855282072206,1,2,116362699,39942444")
+    sql(s"insert into $geoTable select 855282157702,1,2,116325378,39963129")
+    sql(s"insert into $geoTable select 855279270226,1,2,116302895,39930753")
+    sql(s"insert into $geoTable select 855279368850,1,2,116288955,39999101")
+    createPolygonTable
+    sql(
+      s"insert into $polygonTable select 'POLYGON ((116.321011 40.123503, 116.137676 39.947911, " +
+      "116.560993 39.935276, 116.321011 40.123503))','china','1'")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+    val joinQuery =
+      s"select t1.longitude, t1.latitude,t2.poiId from $geoTable t1 inner join (select polygon," +
+      s"poiId from $polygonTable where poitype='china') t2 on in_polygon_join" +
+      s"(t1.mygeohash,t2.polygon)"
+    assert(sql(s"explain $joinQuery").collect()(0)
+      .toString()
+      .contains("- pruned by Main Index\n    - skipped: 5 blocks, 4 blocklets"))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+      CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+    val joinQuery_without_joinUdf =
+      s"select t1.longitude, t1.latitude,t2.poiId from $geoTable t1 inner join (select polygon," +
+      s"poiId from $polygonTable where poitype='china') t2 where in_polygon('116.321011 " +
+      s"40.123503,116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"
+    checkAnswer(sql(joinQuery), sql(joinQuery_without_joinUdf))
+    // set segments for spatial table
+    try {
+      sql(s"set carbon.input.segments.default.$geoTable=0,1")
+      checkAnswer(sql(joinQuery), Seq(Row(116285807L, 40084087L, "1")))
+    } finally {
+      sql(s"set carbon.input.segments.default.$geoTable=*")
+    }
+  }
+
+  test("test join on spatial and polygon table with in_polygon_join_range_list udf") {
+    createTable()
+    loadData()
+    createPolygonTable
+    sql(s"insert into $polygonTable select 'rangelist (855279368850 855279368852, 855280799610 " +
+        s"855280799612,855282156300 855282157400)','xyz','1'")
+    sql(s"insert into $polygonTable select 'RANGELIST (855279368848 855279368850, 855280799613 " +
+        s"855280799615, 855282156301 855282157800)','xyz','2'")
+    sql(s"insert into $polygonTable select 'null','xyz','2'")
+    val df = sql(s"select sum(t1.col1),sum(t1.col2),t2.poiId " +
+                 s"from $geoTable t1 " +
+                 s"inner join " +
+                 s"(select polygon,poiId from $polygonTable where poitype='xyz') t2 " +
+                 s"on in_polygon_join_range_list(t1.mygeohash,t2.polygon) group by t2.poiId")
+    checkAnswer(df, Seq(Row(4, 10, "1"), Row(7, 11, "2")))
+  }
+
+  test("test join on spatial table without data") {
+    createTable()
+    createPolygonTable
+    loadPolygonData
+    val df = sql(s"select sum(t1.col1),sum(t1.col2),t2.poiId " +
+                 s"from $geoTable t1 " +
+                 s"inner join " +
+                 s"(select polygon,poiId from $polygonTable where poitype='abc') t2 " +
+                 s"on in_polygon_join(t1.mygeohash,t2.polygon) group by t2.poiId")
+    assert(df.count() == 0)
+  }
+
+  test("test join with invalid udf data") {
+    createTable()
+    loadData()
+    createPolygonTable
+    loadPolygonData
+    intercept[UnsupportedOperationException](
+      sql(s"select sum(t1.col1),sum(t1.col2),t2.poiId " +
+          s"from $geoTable t1 " +
+          s"inner join " +
+          s"(select polygon,poiId from $polygonTable where poitype='abc') t2 " +
+          s"on in_polygon_join(t2.polygon,t1.mygeohash) group by t2.poiId").collect()
+    ).getMessage.contains("Join condition having left column polygon is not GeoId column")
+  }
+
+  test("test block pruning on spatial and polygon table with in_polygon_join_range_list udf") {
+   createTable()
+    sql(s"insert into $geoTable select 855280799612,1,2,116285807,40084087")
+    sql(s"insert into $geoTable select 855283635086,1,2,116372142,40129503")
+    sql(s"insert into $geoTable select 855279346102,1,2,116187332,39979316")
+    sql(s"insert into $geoTable select 855282156308,1,2,116337069,39951887")
+    createPolygonTable
+    sql(s"insert into $polygonTable select 'RANGELIST (855279368848 855279368850, 855279346102 " +
+        s"855280799615, 855282156301 855282157800)','xyz','2'")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS, "true")
+    sql(s"select sum(col1),2  from $geoTable where IN_POLYGON_RANGE_LIST" +
+      s"('RANGELIST (855279368848 855279368850, 855279346102 " +
+      s"855280799615, 855282156301 855282157800)','OR')")
+    val joinQuery = s"select sum(t1.col1),t2.poiId " +
+              s"from $geoTable t1 " +
+              s"inner join " +
+              s"(select polygon,poiId from $polygonTable where poitype='xyz') t2 " +
+              s"on in_polygon_join_range_list(t1.mygeohash,t2.polygon) group by t2.poiId"
+    assert(sql(s"explain $joinQuery").collect()(0)
+      .toString()
+      .contains("- pruned by Main Index\n    - skipped: 2 blocks, 1 blocklets"))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+      CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+    checkAnswer(sql(joinQuery),
+      sql(s"select sum(col1),'2' from $geoTable where IN_POLYGON_RANGE_LIST" +
+          s"('RANGELIST (855279368848 855279368850, 855279346102 " +
+          s"855280799615, 855282156301 855282157800)','OR')"))
+    sql(s"insert into $polygonTable select 'RANGELIST (855279368848 855279368850, 855279346102 " +
+        s"855280799615, 855282156301 855282157800)','xyz','2'")
+    checkAnswer(sql(joinQuery), Seq(Row(6, "2")))
+  }
+
+  test("test toRangeList as String Udf") {
+    createPolygonTable
+    sql(s"insert into $polygonTable select 'POLYGON ((116.321011 40.123503, 116.320311 " +
+        s"40.122503, 116.321111 40.121503, 116.321011 40.123503))','abc','1'")
+    checkAnswer(sql(s"select ToRangeListAsString(polygon, 39.832277, 50) from $polygonTable"),
+      Seq(Row("855280833998 855280833998,855280834020 855280834020,855280834022 855280834022")))
+  }
+
+  private def createPolygonTable = {
+    sql(s"""
+         | CREATE TABLE $polygonTable(
+         | polygon string,
+         | poiType string,
+         | poiId String)
+         | STORED AS carbondata
+            """.stripMargin)
+  }
+
+  private def loadPolygonData = {
+    sql(s"insert into $polygonTable select 'POLYGON ((120.176433 30.327431,120.171283 30.322245," +
+        s"120.181411 30.314540,120.190509 30.321653,120.185188 30.329358,120.176433 30.327431))'," +
+        "'abc','1'")
+    sql(s"insert into $polygonTable select 'POLYGON ((120.191603 30.328946,120.184179 30.327465," +
+        s"120.181819 30.321464,120.190359 30.315388,120.199242 30.324464,120.191603 30.328946))'," +
+        "'abc','2'")
+    sql(s"insert into $polygonTable select 'null','abcd','2'")
+  }
+
+  override def afterEach(): Unit = {
+    drop()
+  }
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_QUERY_STATISTICS,
+      CarbonCommonConstants.ENABLE_QUERY_STATISTICS_DEFAULT)
+    drop()
+  }
+
+  def createTable(tableName : String = geoTable, customProperties : String = ""): Unit = {
+    sql(s"""
+           | CREATE TABLE $tableName(
+           | col1 INT,
+           | col2 INT,
+           | longitude LONG,
+           | latitude LONG)
+           | STORED AS carbondata
+           | TBLPROPERTIES ($customProperties 'SPATIAL_INDEX'='mygeohash',
+           | 'SPATIAL_INDEX.mygeohash.type'='geohash',
+           | 'SPATIAL_INDEX.mygeohash.sourcecolumns'='longitude, latitude',
+           | 'SPATIAL_INDEX.mygeohash.originLatitude'='39.832277',
+           | 'SPATIAL_INDEX.mygeohash.gridSize'='50',
+           | 'SPATIAL_INDEX.mygeohash.conversionRatio'='1000000',
+           | 'SPATIAL_INDEX.mygeohash.class'='org.apache.carbondata.geo.GeoHashIndex')
+       """.stripMargin)
+  }
+
+  def loadData(tableName : String = geoTable): Unit = {
+    sql(s"""LOAD DATA local inpath '$resourcesPath/geodata3.csv' INTO TABLE $tableName OPTIONS
+           |('DELIMITER'= ',')""".stripMargin)
+  }
+
+  def drop(): Unit = {
+    sql(s"drop table if exists $geoTable")
+    sql(s"drop table if exists $polygonTable")
+    sql(s"drop table if exists $polylineTable")
+  }
+}