You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/08/23 07:00:42 UTC

[incubator-sedona] branch master updated: [SEDONA-151] Add ST aggregators to Sedona Flink (#672)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new d7b1382d [SEDONA-151] Add ST aggregators to Sedona Flink (#672)
d7b1382d is described below

commit d7b1382df343e5455d25470eac37c205995d0d2a
Author: Jia Yu <ji...@apache.org>
AuthorDate: Tue Aug 23 00:00:36 2022 -0700

    [SEDONA-151] Add ST aggregators to Sedona Flink (#672)
---
 docs/api/flink/Aggregator.md                       |  27 ++++
 docs/api/flink/Overview.md                         |   8 +-
 .../main/java/org/apache/sedona/flink/Catalog.java |   2 +
 .../sedona/flink/expressions/Accumulators.java     |  54 ++++++++
 .../sedona/flink/expressions/Aggregators.java      | 139 +++++++++++++++++++++
 .../java/org/apache/sedona/flink/AdapterTest.java  |   6 +-
 .../org/apache/sedona/flink/AggregatorTest.java    |  65 ++++++++++
 .../org/apache/sedona/flink/ConstructorTest.java   |  19 +--
 .../java/org/apache/sedona/flink/FunctionTest.java |   6 +-
 .../java/org/apache/sedona/flink/TestBase.java     | 102 +++++++++++----
 mkdocs.yml                                         |   1 +
 11 files changed, 388 insertions(+), 41 deletions(-)

diff --git a/docs/api/flink/Aggregator.md b/docs/api/flink/Aggregator.md
new file mode 100644
index 00000000..e886ded6
--- /dev/null
+++ b/docs/api/flink/Aggregator.md
@@ -0,0 +1,27 @@
+## ST_Envelope_Aggr
+
+Introduction: Return the entire envelope boundary of all geometries in A
+
+Format: `ST_Envelope_Aggr (A:geometryColumn)`
+
+Since: `v1.3.0`
+
+SQL example:
+```SQL
+SELECT ST_Envelope_Aggr(pointdf.arealandmark)
+FROM pointdf
+```
+
+## ST_Union_Aggr
+
+Introduction: Return the polygon union of all polygons in A. All inputs must be polygons.
+
+Format: `ST_Union_Aggr (A:geometryColumn)`
+
+Since: `v1.3.0`
+
+SQL example:
+```SQL
+SELECT ST_Union_Aggr(polygondf.polygonshape)
+FROM polygondf
+```
\ No newline at end of file
diff --git a/docs/api/flink/Overview.md b/docs/api/flink/Overview.md
index 07b5fcc1..95e1b7d6 100644
--- a/docs/api/flink/Overview.md
+++ b/docs/api/flink/Overview.md
@@ -5,8 +5,10 @@ SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the programmin
 Sedona includes SQL operators as follows.
 
 * Constructor: Construct a Geometry given an input string or coordinates
-	* Example: ST_GeomFromWKT (string). Create a Geometry from a WKT String.
+    * Example: ST_GeomFromWKT (string). Create a Geometry from a WKT String.
 * Function: Execute a function on the given column or columns
-	* Example: ST_Distance (A, B). Given two Geometry A and B, return the Euclidean distance of A and B.
+    * Example: ST_Distance (A, B). Given two Geometry A and B, return the Euclidean distance of A and B.
+* Aggregator: Return a single aggregated value on the given column
+	* Example: ST_Envelope_Aggr (Geometry column). Given a Geometry column, calculate the entire envelope boundary of this column.
 * Predicate: Execute a logic judgement on the given columns and return true or false
-	* Example: ST_Contains (A, B). Check if A fully contains B. Return "True" if yes, else return "False".
+    * Example: ST_Contains (A, B). Check if A fully contains B. Return "True" if yes, else return "False".
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
index adbeb85b..887c8cb9 100644
--- a/flink/src/main/java/org/apache/sedona/flink/Catalog.java
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -19,6 +19,8 @@ import org.apache.sedona.flink.expressions.*;
 public class Catalog {
     public static UserDefinedFunction[] getFuncs() {
         return new UserDefinedFunction[]{
+                new Aggregators.ST_Envelope_Aggr(),
+                new Aggregators.ST_Union_Aggr(),
                 new Constructors.ST_Point(),
                 new Constructors.ST_PointFromText(),
                 new Constructors.ST_LineStringFromText(),
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Accumulators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Accumulators.java
new file mode 100644
index 00000000..fad05837
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Accumulators.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.locationtech.jts.geom.Geometry;
+
+/**
+ * Mutable accumulator of structured type for the aggregate function
+ */
+public class Accumulators {
+
+    public static class Envelope {
+        public double minX = Double.MAX_VALUE;
+        public double minY = Double.MAX_VALUE;
+        public double maxX = Double.MIN_VALUE;
+        public double maxY = Double.MIN_VALUE;
+        void reset() {
+            minX = Double.MAX_VALUE;
+            minY = Double.MAX_VALUE;
+            maxX = Double.MIN_VALUE;
+            maxY = Double.MIN_VALUE;
+        }
+    }
+    public static class AccGeometry {
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry geom;
+    }
+
+    public static class AccGeometry2 {
+        public Geometry geom1;
+        public Geometry geom2;
+    }
+
+    public static class AccGeometryN {
+        public Geometry[] geoms;
+        public int numGeoms;
+        AccGeometryN(int numGeoms) {
+            this.geoms = new Geometry[numGeoms];
+            this.numGeoms = numGeoms;
+        }
+    }
+}
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
new file mode 100644
index 00000000..3b019907
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Aggregators.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.AggregateFunction;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+
+public class Aggregators {
+    // Compute the rectangular boundary of a number of geometries
+    @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+    public static class ST_Envelope_Aggr extends AggregateFunction<Geometry, Accumulators.Envelope> {
+
+        Geometry createPolygon(double minX, double minY, double maxX, double maxY) {
+            Coordinate[] coords = new Coordinate[5];
+            coords[0] = new Coordinate(minX, minY);
+            coords[1] = new Coordinate(minX, maxY);
+            coords[2] = new Coordinate(maxX, maxY);
+            coords[3] = new Coordinate(maxX, minY);
+            coords[4] = coords[0];
+            GeometryFactory geomFact = new GeometryFactory();
+            return geomFact.createPolygon(coords);
+        }
+
+        @Override
+        public Accumulators.Envelope createAccumulator() {
+            return new Accumulators.Envelope();
+        }
+
+        @Override
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry getValue(Accumulators.Envelope acc) {
+            return createPolygon(acc.minX, acc.minY, acc.maxX, acc.maxY);
+        }
+
+        public void accumulate(Accumulators.Envelope acc,
+                               @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+            Envelope envelope = ((Geometry) o).getEnvelopeInternal();
+            acc.minX = Math.min(acc.minX, envelope.getMinX());
+            acc.minY = Math.min(acc.minY, envelope.getMinY());
+            acc.maxX = Math.max(acc.maxX, envelope.getMaxX());
+            acc.maxY = Math.max(acc.maxY, envelope.getMaxY());
+        }
+
+        /**
+         * TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
+         *
+         * @param acc
+         * @param o
+         */
+        public void retract(Accumulators.Envelope acc,
+                            @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+            Geometry geometry = (Geometry) o;
+            assert(false);
+        }
+
+        public void merge(Accumulators.Envelope acc, Iterable<Accumulators.Envelope> it) {
+            for (Accumulators.Envelope a : it) {
+                acc.minX = Math.min(acc.minX, a.minX);
+                acc.minY = Math.min(acc.minY, a.minY);
+                acc.maxX = Math.max(acc.maxX, a.maxX);
+                acc.maxY = Math.max(acc.maxY, a.maxY);
+            }
+        }
+
+        public void resetAccumulator(Accumulators.Envelope acc) {
+            acc.reset();
+        }
+    }
+
+    // Compute the Union boundary of numbers of geometries
+    //
+    @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+    public static class ST_Union_Aggr extends AggregateFunction<Geometry, Accumulators.AccGeometry> {
+
+        @Override
+        public Accumulators.AccGeometry createAccumulator() {
+            return new Accumulators.AccGeometry();
+        }
+
+        @Override
+        @DataTypeHint(value = "RAW", bridgedTo = Geometry.class)
+        public Geometry getValue(Accumulators.AccGeometry acc) {
+            return acc.geom;
+        }
+
+        public void accumulate(Accumulators.AccGeometry acc,
+                               @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+            if (acc.geom == null){
+                acc.geom = (Geometry) o;
+            } else {
+                acc.geom = acc.geom.union((Geometry) o);
+            }
+        }
+
+        /**
+         * TODO: find an efficient algorithm to incrementally and decrementally update the accumulator
+         *
+         * @param acc
+         * @param o
+         */
+        public void retract(Accumulators.AccGeometry acc,
+                            @DataTypeHint(value = "RAW", bridgedTo = Geometry.class) Object o) {
+            Geometry geometry = (Geometry) o;
+            assert (false);
+        }
+
+        public void merge (Accumulators.AccGeometry acc, Iterable < Accumulators.AccGeometry > it){
+            for (Accumulators.AccGeometry a : it) {
+                    if (acc.geom == null){
+    //      make accumulate equal to acc
+                        acc.geom = a.geom;
+                    } else {
+                        acc.geom = acc.geom.union(a.geom);
+                    }
+                }
+        }
+
+        public void resetAccumulator (Accumulators.AccGeometry acc){
+            acc.geom = null;
+        }
+    }
+}
+
+
diff --git a/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java b/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
index 6e429e95..3a24730c 100644
--- a/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/AdapterTest.java
@@ -48,13 +48,13 @@ public class AdapterTest extends TestBase
                         $(polygonColNames[0])).as(polygonColNames[0]),
                 $(polygonColNames[1]));
         Row result = last(geomTable);
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
         // GeomTable to GeomDS
         DataStream<Row> geomStream = tableEnv.toDataStream(geomTable);
-        assertEquals(data.get(0).toString(), geomStream.executeAndCollect(1).get(0).toString());
+        assertEquals(data.get(0).getField(0).toString(), geomStream.executeAndCollect(1).get(0).getField(0).toString());
         // GeomDS to GeomTable
         geomTable = tableEnv.fromDataStream(geomStream);
         result = last(geomTable);
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 }
diff --git a/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
new file mode 100644
index 00000000..1e8e3fe3
--- /dev/null
+++ b/flink/src/test/java/org/apache/sedona/flink/AggregatorTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.table.api.*;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.sedona.flink.expressions.Functions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Polygon;
+
+import static org.apache.flink.table.api.Expressions.*;
+import static org.junit.Assert.assertEquals;
+
+public class AggregatorTest extends TestBase{
+    @BeforeClass
+    public static void onceExecutedBeforeAll() {
+        initialize();
+    }
+
+    @Test
+    public void testEnvelop_Aggr() {
+        Table pointTable = createPointTable(testDataSize);
+        Table result = pointTable.select(call("ST_Envelope_Aggr", $(pointColNames[0])));
+        Row last = last(result);
+        assertEquals(String.format("POLYGON ((0 0, 0 %s, %s %s, %s 0, 0 0))", testDataSize - 1, testDataSize - 1,
+                testDataSize - 1, testDataSize - 1), last.getField(0).toString());
+    }
+
+    @Test
+    public void testKNN() {
+        Table pointTable = createPointTable(testDataSize);
+        pointTable = pointTable.select($(pointColNames[0]), call(Functions.ST_Distance.class.getSimpleName(), $(pointColNames[0])
+                , call("ST_GeomFromWKT", "POINT (0 0)")).as("distance"));
+        tableEnv.createTemporaryView(pointTableName, pointTable);
+        Table resultTable = tableEnv.sqlQuery("SELECT distance, " + pointColNames[0] + " " +
+                "FROM (" +
+                "SELECT *, ROW_NUMBER() OVER (ORDER BY distance ASC) AS row_num " +
+                "FROM " + pointTableName +
+                ")" +
+                "WHERE row_num <= 5");
+        assertEquals(0.0, first(resultTable).getField(0));
+        assertEquals(5.656854249492381, last(resultTable).getField(0));
+    }
+
+    @Test
+    public void testUnion_Aggr(){
+        Table polygonTable = createPolygonOverlappingTable(testDataSize);
+        Table result = polygonTable.select(call("ST_Union_Aggr", $(polygonColNames[0])));
+        Row last = last(result);
+        assertEquals(1001, ((Polygon) last.getField(0)).getArea(), 0);
+    }
+}
diff --git a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
index 7585b4f4..c216bc2f 100644
--- a/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/ConstructorTest.java
@@ -24,6 +24,7 @@ import org.locationtech.jts.geom.GeometryFactory;
 import org.wololo.jts2geojson.GeoJSONReader;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.flink.table.api.Expressions.$;
@@ -74,7 +75,7 @@ public class ConstructorTest extends TestBase{
     public void testPointFromText() {
         List<Row> data = createPointWKT(testDataSize);
         Row result = last(createPointTable(testDataSize));
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 
     @Test
@@ -86,7 +87,7 @@ public class ConstructorTest extends TestBase{
                         $(linestringColNames[1]));
         Row result = last(lineStringTable);
 
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 
     @Test
@@ -98,14 +99,14 @@ public class ConstructorTest extends TestBase{
                         $(linestringColNames[1]));
         Row result = last(lineStringTable);
 
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 
     @Test
     public void testPolygonFromText() {
         List<Row> data = createPolygonWKT(testDataSize);
         Row result = last(createPolygonTable(testDataSize));
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 
     @Test
@@ -116,7 +117,7 @@ public class ConstructorTest extends TestBase{
                 $(polygonColNames[0])).as(polygonColNames[0]),
                 $(polygonColNames[1]));
         Row result = last(geomTable);
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 
     @Test
@@ -127,7 +128,7 @@ public class ConstructorTest extends TestBase{
                         $(polygonColNames[0])).as(polygonColNames[0]),
                 $(polygonColNames[1]));
         Row result = last(geomTable);
-        assertEquals(data.get(data.size() - 1).toString(), result.toString());
+        assertEquals(data.get(data.size() - 1).getField(0).toString(), result.getField(0).toString());
     }
 
     @Test
@@ -179,7 +180,7 @@ public class ConstructorTest extends TestBase{
         TypeInformation<?>[] colTypes = {
                 PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO,
                 BasicTypeInfo.STRING_TYPE_INFO};
-        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, polygonColNames);
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, Arrays.copyOfRange(polygonColNames, 0, 2));
         DataStream<Row> wkbDS = env.fromCollection(data).returns(typeInfo);
         Table wkbTable = tableEnv.fromDataStream(wkbDS, $(polygonColNames[0]), $(polygonColNames[1]));
 
@@ -199,7 +200,7 @@ public class ConstructorTest extends TestBase{
     public void testGeomFromGeoHash() {
         Integer precision = 2;
         List<Row> data = new ArrayList<>();
-        data.add(Row.of("2131s12fd", "polygon"));
+        data.add(Row.of("2131s12fd", "polygon", 0L));
 
         Table geohashTable = createTextTable(data, polygonColNames);
         Table geomTable = geohashTable
@@ -217,7 +218,7 @@ public class ConstructorTest extends TestBase{
     @Test
     public void testGeomFromGeoHashNullPrecision() {
         List<Row> data = new ArrayList<>();
-        data.add(Row.of("2131s12fd", "polygon"));
+        data.add(Row.of("2131s12fd", "polygon", 0L));
 
         Table geohashTable = createTextTable(data, polygonColNames);
         Table geomTable = geohashTable
diff --git a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
index 9e3a20c4..1ee2a743 100644
--- a/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
+++ b/flink/src/test/java/org/apache/sedona/flink/FunctionTest.java
@@ -54,7 +54,7 @@ public class FunctionTest extends TestBase{
         Table pointTable = createPointTable_real(testDataSize);
         Table flippedTable = pointTable.select(call(Functions.ST_FlipCoordinates.class.getSimpleName(), $(pointColNames[0])));
         Geometry result = (Geometry) first(flippedTable).getField(0);
-        assertEquals("POINT (-118 32)", result.toString());
+        assertEquals("POINT (-117.99 32.01)", result.toString());
     }
 
     @Test
@@ -63,7 +63,7 @@ public class FunctionTest extends TestBase{
         Table transformedTable = pointTable.select(call(Functions.ST_Transform.class.getSimpleName(), $(pointColNames[0])
                 , "epsg:4326", "epsg:3857"));
         String result = first(transformedTable).getField(0).toString();
-        assertEquals("POINT (-13135699.91360628 3763310.6271446524)", result);
+        assertEquals("POINT (-13134586.718698347 3764623.3541299687)", result);
     }
 
     @Test
@@ -107,7 +107,7 @@ public class FunctionTest extends TestBase{
         Table pointTable = createPointTable_real(testDataSize);
         Table surfaceTable = pointTable.select(call(Functions.ST_PointOnSurface.class.getSimpleName(), $(pointColNames[0])));
         Geometry result = (Geometry) first(surfaceTable).getField(0);
-        assertEquals("POINT (32 -118)", result.toString());
+        assertEquals("POINT (32.01 -117.99)", result.toString());
     }
 
     @Test
diff --git a/flink/src/test/java/org/apache/sedona/flink/TestBase.java b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
index 81e2cbd1..26c5f14c 100644
--- a/flink/src/test/java/org/apache/sedona/flink/TestBase.java
+++ b/flink/src/test/java/org/apache/sedona/flink/TestBase.java
@@ -13,6 +13,7 @@
  */
 package org.apache.sedona.flink;
 
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
@@ -30,7 +31,9 @@ import org.apache.sedona.flink.expressions.Constructors;
 import org.locationtech.jts.geom.*;
 import org.wololo.jts2geojson.GeoJSONWriter;
 
+import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.apache.flink.table.api.Expressions.$;
@@ -40,11 +43,13 @@ public class TestBase {
     protected static StreamExecutionEnvironment env;
     protected static StreamTableEnvironment tableEnv;
     static int testDataSize = 1000;
-    static String[] pointColNames = {"geom_point", "name_point"};
-    static String[] linestringColNames = {"geom_linestring", "name_linestring"};
-    static String[] polygonColNames = {"geom_polygon", "name_polygon"};
+    static String[] pointColNames = {"geom_point", "name_point", "event_time", "proc_time"};
+    static String[] linestringColNames = {"geom_linestring", "name_linestring", "event_time", "proc_time"};
+    static String[] polygonColNames = {"geom_polygon", "name_polygon", "event_time", "proc_time"};
     static String pointTableName = "point_table";
     static String polygonTableName = "polygon_table";
+    static Long timestamp_base = new Timestamp(System.currentTimeMillis()).getTime();
+    static Long time_interval = 1L; // Generate a record per this interval. Unit is second
 
     public void setTestDataSize(int testDataSize) {
         this.testDataSize = testDataSize;
@@ -68,7 +73,7 @@ public class TestBase {
         List<Row> data = new ArrayList<>();
         for (int i = 0; i < size; i++) {
             // Create a numer of points (1, 1) (2, 2) ...
-            data.add(Row.of(i + "," + i, "point" + i));
+            data.add(Row.of(i + "," + i, "point" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -86,10 +91,13 @@ public class TestBase {
     // Simulate some points in the US
     static List<Row> createPointText_real(int size){
         List<Row> data = new ArrayList<>();
-        for (double i = 0; i < 10.0; i = i + 10.0/size) {
-            double x = 32.0 + i;
-            double y = -118.0 + i;
-            data.add(Row.of(x + "," + y, "point"));
+        double x = 32.0;
+        double y = -118.0;
+        double increment = 10.0/size;
+        for (int i = 0; i < size; i++) {
+            x += increment;
+            y += increment;
+            data.add(Row.of(x + "," + y, "point" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -98,7 +106,7 @@ public class TestBase {
         List<Row> data = new ArrayList<>();
         for (int i = 0; i < size; i++) {
             // Create a numer of points (1, 1) (2, 2) ...
-            data.add(Row.of("POINT (" + i + " " + i +")", "point" + i));
+            data.add(Row.of("POINT (" + i + " " + i +")", "point" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -118,7 +126,7 @@ public class TestBase {
             polygon.add(maxX);polygon.add(maxY);
             polygon.add(maxX);polygon.add(minY);
             polygon.add(minX);polygon.add(minY);
-            data.add(Row.of(String.join(",", polygon), "polygon" + i));
+            data.add(Row.of(String.join(",", polygon), "polygon" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -138,7 +146,7 @@ public class TestBase {
             linestring.add(maxX);
             linestring.add(maxY);
 
-            data.add(Row.of(String.join(",", linestring), "linestring" + i));
+            data.add(Row.of(String.join(",", linestring), "linestring" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -156,7 +164,30 @@ public class TestBase {
             linestring.add(minX + " " + minY);
             linestring.add(maxX + " " + maxY);
 
-            data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i));
+            data.add(Row.of("LINESTRING (" + String.join(", ", linestring) + ")", "linestring" + i, timestamp_base + time_interval * 1000 * i));
+        }
+        return data;
+    }
+
+    // createPolyOverlapping
+    static List<Row> createPolygonOverlapping(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // The polygons are like (-1, 0, 1, 1)
+            // (0, 0, 2, 1)
+            // (1, 0, 3, 1)
+            String minX = String.valueOf(i - 1);
+            String minY = String.valueOf(0);
+            String maxX = String.valueOf(i + 1);
+            String maxY = String.valueOf(1);
+            List<String> polygon = new ArrayList<>();
+            polygon.add(minX);polygon.add(minY);
+            polygon.add(minX);polygon.add(maxY);
+            polygon.add(maxX);polygon.add(maxY);
+            polygon.add(maxX);polygon.add(minY);
+            polygon.add(minX);polygon.add(minY);
+            data.add(Row.of(String.join(",", polygon),"polygon" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -176,7 +207,7 @@ public class TestBase {
             polygon.add(maxX + " " + maxY);
             polygon.add(maxX + " " + minY);
             polygon.add(minX + " " + minY);
-            data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", "polygon" + i));
+            data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", "polygon" + i, timestamp_base + time_interval * 1000 * i));
         }
         return data;
     }
@@ -202,7 +233,7 @@ public class TestBase {
             Geometry polygon = geometryFactory.createPolygon(points);
 
             String geoJson = writer.write(polygon).toString();
-            data.add(Row.of(geoJson, "polygon" + i));
+            data.add(Row.of(geoJson, "polygon" + i, timestamp_base + time_interval * 1000 * i));
         }
 
         return data;
@@ -211,10 +242,17 @@ public class TestBase {
     static Table createTextTable(List<Row> data, String[] colNames){
         TypeInformation<?>[] colTypes = {
                 BasicTypeInfo.STRING_TYPE_INFO,
-                BasicTypeInfo.STRING_TYPE_INFO};
-        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
+                BasicTypeInfo.STRING_TYPE_INFO,
+                BasicTypeInfo.LONG_TYPE_INFO
+        };
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, Arrays.copyOfRange(colNames, 0, 3));
         DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
-        return tableEnv.fromDataStream(ds, $(colNames[0]), $(colNames[1]));
+        // Generate Time Attribute
+        WatermarkStrategy<Row> wmStrategy =
+                WatermarkStrategy
+                        .<Row>forMonotonousTimestamps()
+                        .withTimestampAssigner((event, timestamp) -> event.getFieldAs(2));
+        return tableEnv.fromDataStream(ds.assignTimestampsAndWatermarks(wmStrategy), $(colNames[0]), $(colNames[1]), $(colNames[2]).rowtime(), $(colNames[3]).proctime());
     }
 
     static Table createPointTextTable(int size){
@@ -233,36 +271,49 @@ public class TestBase {
         return createTextTable(createPolygonText(size), polygonColNames);
     }
 
+    static Table createPolygonTextOverlappingTable(int size) {
+        return createTextTable(createPolygonOverlapping(size), polygonColNames);
+    }
+
     static Table createPointTable(int size){
         return createPointTextTable(size)
                 .select(call(Constructors.ST_PointFromText.class.getSimpleName(),
                         $(pointColNames[0])).as(pointColNames[0]),
-                        $(pointColNames[1]));
+                        $(pointColNames[1]), $(pointColNames[2]), $(pointColNames[3]));
     }
 
     static Table createPointTable_real(int size){
         return createPointTextTable_real(size)
                 .select(call(Constructors.ST_PointFromText.class.getSimpleName(),
                         $(pointColNames[0])).as(pointColNames[0]),
-                        $(pointColNames[1]));
+                        $(pointColNames[1]), $(pointColNames[2]), $(pointColNames[3]));
     }
 
     static Table createLineStringTable(int size) {
         return createLineStringTextTable(size)
                 .select(call(Constructors.ST_LineStringFromText.class.getSimpleName(),
                         $(linestringColNames[0])).as(linestringColNames[0]),
-                        $(linestringColNames[1]));
+                        $(linestringColNames[1]), $(linestringColNames[2]), $(linestringColNames[3]));
     }
 
     Table createPolygonTable(int size) {
         return createPolygonTextTable(size)
                 .select(call(Constructors.ST_PolygonFromText.class.getSimpleName(),
                         $(polygonColNames[0])).as(polygonColNames[0]),
-                        $(polygonColNames[1]));
+                        $(polygonColNames[1]), $(polygonColNames[2]), $(polygonColNames[3]));
+    }
+
+    //createPolygonTextOverlapping
+
+    Table createPolygonOverlappingTable(int size) {
+        return createPolygonTextOverlappingTable(size)
+                .select(call(Constructors.ST_PolygonFromText.class.getSimpleName(),
+                                $(polygonColNames[0])).as(polygonColNames[0]),
+                        $(polygonColNames[1]), $(polygonColNames[2]), $(polygonColNames[3]));
     }
 
     /**
-     * Get the iterator of the flink
+     * Get the iterator of the table
      * @param table
      * @return
      */
@@ -271,7 +322,7 @@ public class TestBase {
     }
 
     /**
-     * Iterate to the last row of the flink
+     * Iterate to the last row of the table
      * @param table
      * @return
      */
@@ -282,6 +333,11 @@ public class TestBase {
         return lastRow;
     }
 
+    /**
+     * Get the first row of the table
+     * @param table
+     * @return
+     */
     static Row first(Table table) {
         CloseableIterator<Row> it = iterate(table);
         assert(it.hasNext());
diff --git a/mkdocs.yml b/mkdocs.yml
index def129a9..a0b898b8 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -76,6 +76,7 @@ nav:
           - Overview: api/flink/Overview.md
           - Constructor: api/flink/Constructor.md
           - Function: api/flink/Function.md
+          - Aggregator: api/flink/Aggregator.md
           - Predicate: api/flink/Predicate.md
     - Community:
       - Community: community/contact.md