You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/02/12 14:55:17 UTC

[carbondata] branch master updated: [CARBONDATA-3548] Polygon expression processing using unknown expression and filtering performance improvement

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ff487f  [CARBONDATA-3548] Polygon expression processing using unknown expression and filtering performance improvement
8ff487f is described below

commit 8ff487f3b3d6e0a0d561f0cc763db14aff5a51e9
Author: Venu Reddy <ve...@huawei.com>
AuthorDate: Wed Feb 5 15:39:57 2020 +0530

    [CARBONDATA-3548] Polygon expression processing using unknown expression and filtering performance improvement
    
    Why is this PR needed?
    This PR improves the query processing performance of in_polygon UDF.
    
    What changes were proposed in this PR?
    At present, PolygonExpression processing leverages the existing
    InExpression. PolygonExpression internally creates a InExpression as a
    child to it. InExpression is constructed/build from the result of Quad
    tree algorithm. Algorithm returns the list of ranges(with each range
    having min and max Id for that range). And this list is a sorted one.
    InExpression constitute of 2 childs. One child is a columnExpression(for
    geohash column) and the other is a ListExpression( with List of
    LiternalExpressions. One LiteralExpression for each Id returned from
    algo).
    Problems associated with this approach:
    
    We expand the list of ranges(with each range having minand max) to all
    individual Ids. And create LiteralExpression for each Id. Since we can
    have large ranges(and the numerous ranges), it consumes huge amount of
    memory in processing.
    Due to same reason, it slows does the filter execution.
    
    Modifications with this PR:
    Instead we can use UnknownExpression with RowLevelFilterResolverImpl and
    RowLevelFilterExecuterImpl processing. And override evaluate() method to
    do the binary search on the list of ranges directly. This will
    significanly improve the polygon filter query performance. And Polygon
    filter expression type is not required anymore at Carbon-Core module.
    
    Does this PR introduce any user interface change?
    No.
    
    Is any new testcase added?
    Yes. Added an end to end test case
    
    This closes #3616
---
 .../scan/filter/FilterExpressionProcessor.java     |  2 -
 .../core/scan/filter/intf/ExpressionType.java      |  3 +-
 .../geo/scan/expression/PolygonExpression.java     | 97 ++++++++++++++++------
 .../scala/org/apache/carbondata/geo/GeoTest.scala  | 36 ++++++--
 4 files changed, 98 insertions(+), 40 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 4d80fbc..e5405ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -168,8 +168,6 @@ public class FilterExpressionProcessor implements FilterProcessor {
       case TRUE:
         return getFilterResolverBasedOnExpressionType(ExpressionType.TRUE, false,
             expressionTree, tableIdentifier, expressionTree);
-      case POLYGON:
-        return createFilterResolverTree(expressionTree.getChildren().get(0), tableIdentifier);
       default:
         return getFilterResolverBasedOnExpressionType(ExpressionType.UNKNOWN, false, expressionTree,
             tableIdentifier, expressionTree);
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
index 5614dda..a89a84f 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/intf/ExpressionType.java
@@ -44,6 +44,5 @@ public enum ExpressionType {
   ENDSWITH,
   CONTAINSWITH,
   TEXT_MATCH,
-  IMPLICIT,
-  POLYGON
+  IMPLICIT
 }
diff --git a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
index 433866f..5f27b9a 100644
--- a/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
+++ b/geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolygonExpression.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -28,9 +29,8 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.ExpressionResult;
-import org.apache.carbondata.core.scan.expression.LiteralExpression;
-import org.apache.carbondata.core.scan.expression.conditional.InExpression;
-import org.apache.carbondata.core.scan.expression.conditional.ListExpression;
+import org.apache.carbondata.core.scan.expression.UnknownExpression;
+import org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.util.CustomIndex;
@@ -41,64 +41,85 @@ import org.apache.carbondata.core.util.CustomIndex;
  * InExpression with list of all the IDs present in those list of ranges.
  */
 @InterfaceAudience.Internal
-public class PolygonExpression extends Expression {
+public class PolygonExpression extends UnknownExpression implements ConditionalExpression {
   private String polygon;
-  private String columnName;
   private CustomIndex<List<Long[]>> handler;
-  private List<Expression> children = new ArrayList<Expression>();
+  private List<Long[]> ranges = new ArrayList<Long[]>();
+  private ColumnExpression column;
+  private ExpressionResult trueExpRes;
+  private ExpressionResult falseExpRes;
 
   public PolygonExpression(String polygon, String columnName, CustomIndex handler) {
     this.polygon = polygon;
     this.handler = handler;
-    this.columnName = columnName;
+    this.column = new ColumnExpression(columnName, DataTypes.LONG);
+    this.trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
+    this.falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
   }
 
-  private void buildExpression(List<Long[]> ranges) {
-    // Build InExpression with list of all the values present in the ranges
-    List<Expression> inList = new ArrayList<Expression>();
+  private void validate(List<Long[]> ranges) {
+    // Validate the ranges
     for (Long[] range : ranges) {
       if (range.length != 2) {
         throw new RuntimeException("Handler query must return list of ranges with each range "
             + "containing minimum and maximum values");
       }
-      for (long i = range[0]; i <= range[1]; i++) {
-        inList.add(new LiteralExpression(i, DataTypes.LONG));
-      }
     }
-    children.add(new InExpression(new ColumnExpression(columnName, DataTypes.LONG),
-        new ListExpression(inList)));
   }
 
   /**
-   * This method builds InExpression with list of all the values present in the list of ranges of
-   * IDs.
+   * This method calls the query processor and gets the list of ranges of IDs.
    */
   private void processExpression() {
-    List<Long[]> ranges;
     try {
       ranges = handler.query(polygon);
+      validate(ranges);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    buildExpression(ranges);
+  }
+
+  private boolean rangeBinarySearch(List<Long[]> ranges, long searchForNumber) {
+    Long[] range;
+    int low = 0, mid, high = ranges.size() - 1;
+    while (low <= high) {
+      mid = low + ((high - low) / 2);
+      range = ranges.get(mid);
+      if (searchForNumber >= range[0]) {
+        if (searchForNumber <= range[1]) {
+          // Return true if the number is between min and max values of the range
+          return true;
+        } else {
+          // Number is bigger than this range's min and max. Search on the right side of the range
+          low = mid + 1;
+        }
+      } else {
+        // Number is smaller than this range's min and max. Search on the left side of the range
+        high = mid - 1;
+      }
+    }
+    return false;
   }
 
   @Override
   public ExpressionResult evaluate(RowIntf value) {
-    throw new UnsupportedOperationException("Operation not supported for Polygon expression");
+    if (rangeBinarySearch(ranges, (Long) value.getVal(0))) {
+      return trueExpRes;
+    }
+    return falseExpRes;
   }
 
   @Override
   public ExpressionType getFilterExpressionType() {
-    return ExpressionType.POLYGON;
+    return ExpressionType.UNKNOWN;
   }
 
   @Override
   public List<Expression> getChildren() {
-    if (children.isEmpty()) {
+    if (ranges.isEmpty()) {
       processExpression();
     }
-    return children;
+    return super.getChildren();
   }
 
   @Override
@@ -107,7 +128,7 @@ public class PolygonExpression extends Expression {
 
   @Override
   public String getString() {
-    return polygon;
+    return getStatement();
   }
 
   @Override
@@ -117,14 +138,36 @@ public class PolygonExpression extends Expression {
 
   private void writeObject(ObjectOutputStream out) throws IOException {
     out.writeObject(polygon);
-    out.writeObject(columnName);
     out.writeObject(handler);
+    out.writeObject(column);
   }
 
   private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
     polygon = (String) in.readObject();
-    columnName = (String) in.readObject();
     handler = (CustomIndex<List<Long[]>>) in.readObject();
-    children = new ArrayList<Expression>();
+    column = (ColumnExpression) in.readObject();
+    ranges = new ArrayList<Long[]>();
+    trueExpRes = new ExpressionResult(DataTypes.BOOLEAN, true);
+    falseExpRes = new ExpressionResult(DataTypes.BOOLEAN, false);
+  }
+
+  @Override
+  public List<ColumnExpression> getAllColumnList() {
+    return new ArrayList<ColumnExpression>(Arrays.asList(column));
+  }
+
+  @Override
+  public List<ColumnExpression> getColumnList() {
+    return getAllColumnList();
+  }
+
+  @Override
+  public boolean isSingleColumn() {
+    return true;
+  }
+
+  @Override
+  public List<ExpressionResult> getLiterals() {
+    return null;
   }
 }
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
index 7f05cc8..81c86ac 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/geo/GeoTest.scala
@@ -1,12 +1,13 @@
 package org.apache.carbondata.geo
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
-class GeoTest extends QueryTest with BeforeAndAfterAll {
+class GeoTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfter {
   override def beforeAll(): Unit = {
     drop()
   }
@@ -80,6 +81,23 @@ class GeoTest extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test polygon query") {
+    createTable()
+    loadData()
+    checkAnswer(
+      sql(s"select longitude, latitude from geotable where IN_POLYGON('116.321011 40.123503, " +
+          s"116.137676 39.947911, 116.560993 39.935276, 116.321011 40.123503')"),
+      Seq(Row(116187332, 39979316),
+        Row(116362699, 39942444),
+        Row(116288955, 39999101),
+        Row(116325378, 39963129),
+        Row(116337069, 39951887),
+        Row(116285807, 40084087)))
+  }
+
+  after {
+    drop()
+  }
   override def afterAll(): Unit = {
     drop()
   }
@@ -98,13 +116,13 @@ class GeoTest extends QueryTest with BeforeAndAfterAll {
            | TBLPROPERTIES ('INDEX_HANDLER'='mygeohash',
            | 'INDEX_HANDLER.mygeohash.type'='geohash',
            | 'INDEX_HANDLER.mygeohash.sourcecolumns'='longitude, latitude',
-           | 'INDEX_HANDLER.mygeohash.originLatitude'='1',
-           | 'INDEX_HANDLER.mygeohash.gridSize'='2',
-           | 'INDEX_HANDLER.mygeohash.minLongitude'='1',
-           | 'INDEX_HANDLER.mygeohash.maxLongitude'='4',
-           | 'INDEX_HANDLER.mygeohash.minLatitude'='1',
-           | 'INDEX_HANDLER.mygeohash.maxLatitude'='4',
-           | 'INDEX_HANDLER.mygeohash.conversionRatio'='1')
+           | 'INDEX_HANDLER.mygeohash.originLatitude'='39.832277',
+           | 'INDEX_HANDLER.mygeohash.gridSize'='50',
+           | 'INDEX_HANDLER.mygeohash.minLongitude'='115.811865',
+           | 'INDEX_HANDLER.mygeohash.maxLongitude'='116.782233',
+           | 'INDEX_HANDLER.mygeohash.minLatitude'='39.832277',
+           | 'INDEX_HANDLER.mygeohash.maxLatitude'='40.225281',
+           | 'INDEX_HANDLER.mygeohash.conversionRatio'='1000000')
        """.stripMargin)
   }