You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2021/05/05 15:48:33 UTC

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4127: [CARBONDATA-4166] Geo spatial Query Enhancements

ajantha-bhat commented on a change in pull request #4127:
URL: https://github.com/apache/carbondata/pull/4127#discussion_r626640238



##########
File path: docs/spatial-index-guide.md
##########
@@ -106,18 +107,52 @@ Query with Polygon List UDF predicate
 select * from source_index where IN_POLYGON_LIST('POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)), POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276))', 'OR')
 ```
 
+or
+
+```
+select * from source_index where IN_POLYGON_LIST('select polygon from polyon_table', 'OR')

Review comment:
       may be here we have to mention how `polygon_table`  looks like to know schema of polygon column for better understanding of the example. 
   same comments for down also.

##########
File path: geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonListExpression.java
##########
@@ -55,7 +55,9 @@ public void processExpression() {
       Matcher matcher = pattern.matcher(polygon);
       while (matcher.find()) {
         String matchedStr = matcher.group();
-        polygons.add(matchedStr);
+        if (!(matchedStr == null || matchedStr.isEmpty())) {

Review comment:
       please use `!StringUtils.isEmpty`

##########
File path: geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonRangeListExpression.java
##########
@@ -40,22 +40,41 @@
 
   private String opType;
 
+  /**
+   * If range start's with RANGELIST_REG_EXPRESSION or not
+   */
+  private boolean hasPattern = true;

Review comment:
       `computeRange` may be more suitable name for this variable.

##########
File path: docs/spatial-index-guide.md
##########
@@ -1,3 +1,4 @@
+

Review comment:
       revert empty line

##########
File path: docs/spatial-index-guide.md
##########
@@ -106,18 +107,52 @@ Query with Polygon List UDF predicate
 select * from source_index where IN_POLYGON_LIST('POLYGON ((116.137676 40.163503, 116.137676 39.935276, 116.560993 39.935276, 116.137676 40.163503)), POLYGON ((116.560993 39.935276, 116.560993 40.163503, 116.137676 40.163503, 116.560993 39.935276))', 'OR')
 ```
 
+or
+
+```
+select * from source_index where IN_POLYGON_LIST('select polygon from polyon_table', 'OR')
+```
+
 Query with Polyline List UDF predicate
 
 ```
 select * from source_index where IN_POLYLINE_LIST('LINESTRING (116.137676 40.163503, 116.137676 39.935276, 116.260993 39.935276), LINESTRING (116.260993 39.935276, 116.560993 39.935276, 116.560993 40.163503)', 65)
 ```
 
+or
+
+```
+select * from source_index where IN_POLYLINE_LIST('select polyLine from polyon_table', 65)
+```
+
 Query with Polygon Range List UDF predicate
 
 ```
 select * from source_index where IN_POLYGON_RANGE_LIST('RANGELIST (855279368848 855279368850, 855280799610 855280799612, 855282156300 855282157400), RANGELIST (855279368852 855279368854, 855280799613 855280799615, 855282156200 855282157500)', 'OR')
 ```
 
+Query having Join on Spatial and Polygon table with Polygon Join UDF predicate
+
+```
+select sum(t1.col1), t2.poiId
+from spatial_table t1
+inner join
+(select polygon, poiId from polygon_table) t2

Review comment:
       please give table schema before example for better understanding.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/geo/GeoUtilUDFs.scala
##########
@@ -30,6 +32,7 @@ object GeoUtilUDFs {
     sparkSession.udf.register("LatLngToGeoId", new LatLngToGeoIdUDF)
     sparkSession.udf.register("ToUpperLayerGeoId", new ToUpperLayerGeoIdUDF)
     sparkSession.udf.register("ToRangeList", new ToRangeListUDF)
+    sparkSession.udf.register("ToRangeListAsString", new ToRangeListAsStringUDF)

Review comment:
       these UDF are exposed to user right ? can we update in the document ?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/DMLStrategy.scala
##########
@@ -56,6 +65,104 @@ object DMLStrategy extends SparkStrategy {
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] && driverSideCountStar(l) =>
         val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
         CarbonCountStar(colAttr, relation.carbonTable, SparkSession.getActiveSession.get) :: Nil
+      case Join(left, right, joinType, condition)
+        if condition.isDefined && condition.get.isInstanceOf[ScalaUDF] &&
+           isPolygonUdfFilter(condition) =>

Review comment:
       ```suggestion
              isPolygonJoinUdfFilter(condition) =>
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(

Review comment:
       many things are common with BroadCastSIFilterPushJoin, we cannot extend the same class ?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/geo/InPolygonUDF.scala
##########
@@ -38,6 +43,46 @@ class InPolygonUDF extends (String => Boolean) with Serializable {
   }
 }
 
+@InterfaceAudience.Internal
+class InPolygonJoinRangeListUDF extends ((String, String) => Boolean) with Serializable {
+  override def apply(geoId: String, polygonRanges: String): Boolean = {
+    if (polygonRanges == null || polygonRanges.equalsIgnoreCase("null")) {
+      return false
+    }
+    // parser and get the range list
+    var range: String = polygonRanges
+    val pattern = Pattern.compile(GeoConstants.RANGELIST_REG_EXPRESSION, Pattern.CASE_INSENSITIVE)
+    val matcher = pattern.matcher(polygonRanges)
+    while ( { matcher.find }) {
+      val matchedStr = matcher.group
+      range = matchedStr
+    }
+    val ranges = PolygonRangeListExpression.getRangeListFromString(range)

Review comment:
       here we need to check against `null` for ranges? some places we check and some places we don't. (example `ToRangeListAsStringUDF`) can we make it uniform? If not required, remove from other places also. also maybe extract a common method for matcher and find if possible.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/joins/BroadCastPolygonFilterPushJoin.scala
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{BindReferences, Expression, JoinedRow, Literal, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
+import org.apache.spark.sql.execution.{BinaryExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec}
+import org.apache.spark.sql.execution.joins.BroadCastPolygonFilterPushJoin.addPolygonRangeListFilterToPlan
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.execution.strategy.CarbonDataSourceScan
+import org.apache.spark.sql.secondaryindex.joins.BroadCastSIFilterPushJoin
+import org.apache.spark.sql.types.TimestampType
+import org.apache.spark.unsafe.types.UTF8String
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.index.{IndexFilter, Segment}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.geo.{GeoConstants, GeoUtils}
+import org.apache.carbondata.geo.scan.expression.PolygonRangeListExpression
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
+
+case class BroadCastPolygonFilterPushJoin(
+    leftKeys: Seq[Expression],
+    rightKeys: Seq[Expression],
+    joinType: JoinType,
+    buildSide: BuildSide,
+    condition: Option[Expression],
+    left: SparkPlan,
+    right: SparkPlan
+) extends BinaryExecNode with HashJoin {
+
+  override protected lazy val (buildPlan, streamedPlan) = buildSide match {
+    case BuildLeft => (left, right)
+    case BuildRight => (right, left)
+  }
+
+  override lazy val metrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
+
+  private lazy val inputCopy: Array[InternalRow] = {
+    getBuildPlan.map(_.copy()).collect().clone()
+  }
+
+  lazy val spatialTableRDD: Option[RDD[InternalRow]] = streamedPlan.collectFirst {
+    case scan: CarbonDataSourceScan => scan.inputRDDs().head
+  }
+
+  @transient private lazy val boundCondition: InternalRow => Boolean = {
+    if (condition.isDefined) {
+      newPredicate(condition.get, streamedPlan.output ++ buildPlan.output).eval _
+    } else {
+      (_: InternalRow) => true
+    }
+  }
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    // get polygon rangeList from polygon table and add as IN_POLYGON_RANGE_LIST filter to the
+    // spatial table
+    addPolygonRangeListFilterToPlan(buildPlan, streamedPlan, inputCopy, condition)
+    // inner join spatial and polygon plan by applying in_polygon join filter
+    streamedPlan.execute().mapPartitionsInternal {
+      streamedIter =>
+        // get polygon table data rows
+        val buildRows = inputCopy
+        val joinedRow = new JoinedRow
+
+        streamedIter.flatMap { streamedRow =>
+          val joinedRows = buildRows.iterator.map(r => joinedRow(streamedRow, r))
+          // apply in_polygon_join filter
+          if (condition.isDefined) {
+            joinedRows.filter(boundCondition)
+          } else {
+            joinedRows
+          }
+        }
+    }
+  }
+
+  protected def getBuildPlan: RDD[InternalRow] = {
+    buildPlan match {
+      case c@CarbonBroadCastExchangeExec(_, _) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+        c.asInstanceOf[BroadcastExchangeExec].child.execute()
+      case _ => buildPlan.children.head match {
+        case c@CarbonBroadCastExchangeExec(_, _) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case ReusedExchangeExec(_, c@CarbonBroadCastExchangeExec(_, _)) =>
+          c.asInstanceOf[BroadcastExchangeExec].child.execute()
+        case _ => buildPlan.execute()
+      }
+    }
+  }
+}
+
+object BroadCastPolygonFilterPushJoin {

Review comment:
       also please add more comments for this class and explanation for each 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org