You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/01/29 23:52:34 UTC

[incubator-sedona] branch flink-support updated: Add the basic flink support

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch flink-support
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/flink-support by this push:
     new b0cece0  Add the basic flink support
b0cece0 is described below

commit b0cece0b472d00a83ffae9aac0ebd89e8e44a8a4
Author: Jia Yu <ji...@apache.org>
AuthorDate: Sat Jan 29 15:52:25 2022 -0800

    Add the basic flink support
---
 .../sedona/core/joinJudgement/JudgementBase.java   |  38 +---
 .../sedona/core/joinJudgement/JudgementHelper.java |  58 ++++++
 flink/.gitignore                                   |   4 +
 flink/pom.xml                                      | 149 ++++++++++++++
 .../main/java/org/apache/sedona/flink/Catalog.java |  41 ++++
 .../sedona/flink/SedonaFlinkRegistrator.java       |  54 +++++
 .../sedona/flink/expressions/Constructors.java     |  87 ++++++++
 .../apache/sedona/flink/expressions/Functions.java |  69 +++++++
 .../sedona/flink/expressions/JoinOperators.java    |  56 +++++
 .../sedona/flink/expressions/Predicates.java       | 107 ++++++++++
 .../org.apache.sedona/flink/ConstructorTest.java   |  83 ++++++++
 .../java/org.apache.sedona/flink/FunctionTest.java |  56 +++++
 .../org.apache.sedona/flink/PredicateTest.java     |  36 ++++
 .../java/org.apache.sedona/flink/TestBase.java     | 225 +++++++++++++++++++++
 pom.xml                                            |   1 +
 15 files changed, 1028 insertions(+), 36 deletions(-)

diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
index 582766f..a3f2394 100644
--- a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementBase.java
@@ -122,42 +122,8 @@ abstract class JudgementBase
         }
     }
 
-    protected boolean match(Geometry left, Geometry right)
+    public boolean match(Geometry left, Geometry right)
     {
-        if (extent != null) {
-            // Handle easy case: points. Since each point is assigned to exactly one partition,
-            // different partitions cannot emit duplicate results.
-            if (left instanceof Point || right instanceof Point) {
-                return geoMatch(left, right);
-            }
-
-            // Neither geometry is a point
-
-            // Check if reference point of the intersection of the bounding boxes lies within
-            // the extent of this partition. If not, don't run any checks. Let the partition
-            // that contains the reference point do all the work.
-            Envelope intersection =
-                    left.getEnvelopeInternal().intersection(right.getEnvelopeInternal());
-            if (!intersection.isNull()) {
-                final Point referencePoint =
-                        makePoint(intersection.getMinX(), intersection.getMinY(), left.getFactory());
-                if (!extent.contains(referencePoint)) {
-                    return false;
-                }
-            }
-        }
-
-        return geoMatch(left, right);
-    }
-
-    private Point makePoint(double x, double y, GeometryFactory factory)
-    {
-        return factory.createPoint(new Coordinate(x, y));
-    }
-
-    private boolean geoMatch(Geometry left, Geometry right)
-    {
-        //log.warn("Check "+left.toText()+" with "+right.toText());
-        return considerBoundaryIntersection ? left.intersects(right) : left.covers(right);
+        return JudgementHelper.match(left, right, extent, considerBoundaryIntersection);
     }
 }
diff --git a/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementHelper.java b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementHelper.java
new file mode 100644
index 0000000..273ccde
--- /dev/null
+++ b/core/src/main/java/org/apache/sedona/core/joinJudgement/JudgementHelper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.core.joinJudgement;
+
+import org.apache.sedona.core.utils.HalfOpenRectangle;
+import org.locationtech.jts.geom.*;
+
+public class JudgementHelper {
+    public static boolean match(Geometry left, Geometry right, HalfOpenRectangle extent, boolean considerBoundaryIntersection)
+    {
+        if (extent != null) {
+            // Handle easy case: points. Since each point is assigned to exactly one partition,
+            // different partitions cannot emit duplicate results.
+            if (left instanceof Point || right instanceof Point) {
+                return geoMatch(left, right, considerBoundaryIntersection);
+            }
+
+            // Neither geometry is a point
+
+            // Check if reference point of the intersection of the bounding boxes lies within
+            // the extent of this partition. If not, don't run any checks. Let the partition
+            // that contains the reference point do all the work.
+            Envelope intersection =
+                    left.getEnvelopeInternal().intersection(right.getEnvelopeInternal());
+            if (!intersection.isNull()) {
+                final Point referencePoint =
+                        makePoint(intersection.getMinX(), intersection.getMinY(), left.getFactory());
+                if (!extent.contains(referencePoint)) {
+                    return false;
+                }
+            }
+        }
+
+        return geoMatch(left, right, considerBoundaryIntersection);
+    }
+
+    private static Point makePoint(double x, double y, GeometryFactory factory)
+    {
+        return factory.createPoint(new Coordinate(x, y));
+    }
+
+    private static boolean geoMatch(Geometry left, Geometry right, boolean considerBoundaryIntersection)
+    {
+        //log.warn("Check "+left.toText()+" with "+right.toText());
+        return considerBoundaryIntersection ? left.intersects(right) : left.covers(right);
+    }
+}
diff --git a/flink/.gitignore b/flink/.gitignore
new file mode 100644
index 0000000..c5c30ba
--- /dev/null
+++ b/flink/.gitignore
@@ -0,0 +1,4 @@
+/target/
+/streaming.iml
+/sedona-flink-3.0_2.12.iml
+*.12.iml
diff --git a/flink/pom.xml b/flink/pom.xml
new file mode 100644
index 0000000..53c23a0
--- /dev/null
+++ b/flink/pom.xml
@@ -0,0 +1,149 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.sedona</groupId>
+        <artifactId>sedona-parent</artifactId>
+        <version>1.2.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+	<artifactId>sedona-flink_${scala.compat.version}</artifactId>
+
+	<name>${project.groupId}:${project.artifactId}</name>
+	<description>A cluster computing system for processing large-scale spatial data: Streaming API for Apache Flink. Apache Sedona is an effort undergoing incubation at The Apache Software Foundation (ASF), sponsored by the Apache Incubator. Incubation is required of all newly accepted projects until a further review indicates that the infrastructure, communications, and decision making process have stabilized in a manner consistent with other successful ASF projects. While incubation statu [...]
+    <url>http://sedona.apache.org/</url>
+	<packaging>jar</packaging>
+
+    <properties>
+        <maven.deploy.skip>false</maven.deploy.skip>
+        <flink.version>1.14.3</flink.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.sedona</groupId>
+            <artifactId>sedona-core-${spark.compat.version}_${scala.compat.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sedona</groupId>
+            <artifactId>sedona-sql-${spark.compat.version}_${scala.compat.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+<!--        For Flink DataStream API-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+<!--        Flink Kafka connector-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+<!--        For playing flink in IDE-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+<!--        For Flink flink api, planner, udf/udt, csv-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+<!--        Starting Flink 14, Blink planner has been renamed to the official Flink planner-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <version>${flink.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+<!--        For Flink Web Ui in test-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime-web_${scala.compat.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.jheaps</groupId>
+            <artifactId>jheaps</artifactId>
+            <version>0.14</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+<!--	<build>-->
+<!--        <sourceDirectory>src/main/scala</sourceDirectory>-->
+<!--        <plugins>-->
+<!--            <plugin>-->
+<!--                <groupId>org.scalastyle</groupId>-->
+<!--                <artifactId>scalastyle-maven-plugin</artifactId>-->
+<!--                <version>1.0.0</version>-->
+<!--                <configuration>-->
+<!--                    <verbose>false</verbose>-->
+<!--                    <failOnViolation>true</failOnViolation>-->
+<!--                    <includeTestSourceDirectory>true</includeTestSourceDirectory>-->
+<!--                    <failOnWarning>false</failOnWarning>-->
+<!--                    <sourceDirectory>${project.basedir}/src/main/scala</sourceDirectory>-->
+<!--                    <testSourceDirectory>${project.basedir}/src/test/scala</testSourceDirectory>-->
+<!--                    <configLocation>${project.basedir}/../scalastyle_config.xml</configLocation>-->
+<!--                    <outputFile>${project.basedir}/target/scalastyle-output.xml</outputFile>-->
+<!--                    <outputEncoding>UTF-8</outputEncoding>-->
+<!--                </configuration>-->
+<!--                <executions>-->
+<!--                    <execution>-->
+<!--                        <goals>-->
+<!--                            <goal>check</goal>-->
+<!--                        </goals>-->
+<!--                    </execution>-->
+<!--                </executions>-->
+<!--            </plugin>-->
+<!--        </plugins>-->
+<!--	</build>-->
+</project>
+  
diff --git a/flink/src/main/java/org/apache/sedona/flink/Catalog.java b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
new file mode 100644
index 0000000..f9428fe
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/Catalog.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
+import org.apache.sedona.flink.expressions.*;
+
+public class Catalog {
+    public static UserDefinedFunction[] getFuncs() {
+        return new UserDefinedFunction[]{
+                new Constructors.ST_PointFromText(),
+                new Constructors.ST_PolygonFromText(),
+                new Constructors.ST_PolygonFromEnvelope(),
+                new Constructors.ST_GeomFromWKT(),
+                new Constructors.ST_GeomFromWKB(),
+                new Functions.ST_Buffer(),
+                new Functions.ST_Distance(),
+                new Functions.ST_Transform(),
+                new Functions.ST_FlipCoordinates(),
+        };
+    }
+
+    public static UserDefinedFunction[] getPredicates() {
+        return new UserDefinedFunction[]{
+                new Predicates.ST_Intersects(),
+                new Predicates.ST_Contains()
+        };
+    }
+}
diff --git a/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java b/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
new file mode 100644
index 0000000..7d63bff
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/SedonaFlinkRegistrator.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.sedona.core.geometryObjects.Circle;
+import org.apache.sedona.core.geometryObjects.GeometrySerde;
+import org.apache.sedona.core.geometryObjects.SpatialIndexSerde;
+import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
+import org.locationtech.jts.geom.*;
+import org.locationtech.jts.index.quadtree.Quadtree;
+import org.locationtech.jts.index.strtree.STRtree;
+
+import java.util.Arrays;
+
+public class SedonaFlinkRegistrator {
+
+    public static void registerFunc(StreamTableEnvironment tblEnv) {
+        Arrays.stream(Catalog.getFuncs()).forEach(
+                func -> tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
+        );
+        Arrays.stream(Catalog.getPredicates()).forEach(
+                func -> tblEnv.createTemporarySystemFunction(func.getClass().getSimpleName(), func)
+        );
+    }
+
+    public static void registerType(StreamExecutionEnvironment env) {
+        GeometrySerde serializer = new GeometrySerde();
+        SpatialIndexSerde indexSerializer = new SpatialIndexSerde(serializer);
+        env.getConfig().registerTypeWithKryoSerializer(Point.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(LineString.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(Polygon.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(MultiPoint.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(MultiLineString.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(MultiPolygon.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(GeometryCollection.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(Circle.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(Envelope.class, serializer);
+        env.getConfig().registerTypeWithKryoSerializer(Quadtree.class, indexSerializer);
+        env.getConfig().registerTypeWithKryoSerializer(STRtree.class, indexSerializer);
+    }
+}
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
new file mode 100644
index 0000000..d0f1610
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Constructors.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.core.enums.FileDataSplitter;
+import org.apache.sedona.core.enums.GeometryType;
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.io.ParseException;
+
+public class Constructors {
+    public static class ST_PointFromText extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
+            FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
+            FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POINT);
+            return formatUtils.readGeometry(s);
+        }
+
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
+            return eval(s, null);
+        }
+    }
+
+    public static class ST_PolygonFromText extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("String") String s, @DataTypeHint("String") String inputDelimiter) throws ParseException {
+            // The default delimiter is comma. Otherwise, use the delimiter given by the user
+            // TODO: The getSplitter function uses a loop to find the instance. Should use HashTable instead.
+            FileDataSplitter delimiter = inputDelimiter == null? FileDataSplitter.CSV:FileDataSplitter.getFileDataSplitter(inputDelimiter);
+            FormatUtils<Geometry> formatUtils = new FormatUtils(delimiter, false, GeometryType.POLYGON);
+            return formatUtils.readGeometry(s);
+        }
+
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("String") String s) throws ParseException {
+            return eval(s, null);
+        }
+    }
+
+    public static class ST_PolygonFromEnvelope extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("Double") Double minX, @DataTypeHint("Double") Double minY,
+                             @DataTypeHint("Double") Double maxX, @DataTypeHint("Double") Double maxY) {
+            Coordinate[] coordinates = new Coordinate[5];
+            coordinates[0] = new Coordinate(minX, minY);
+            coordinates[1] = new Coordinate(minX, maxY);
+            coordinates[2] = new Coordinate(maxX, maxY);
+            coordinates[3] = new Coordinate(maxX, minY);
+            coordinates[4] = coordinates[0];
+            GeometryFactory geometryFactory = new GeometryFactory();
+            return geometryFactory.createPolygon(coordinates);
+        }
+    }
+
+    public static class ST_GeomFromWKT extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("String") String wktString) throws ParseException {
+            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
+            return formatUtils.readGeometry(wktString);
+        }
+    }
+
+    public static class ST_GeomFromWKB extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("String") String wkbString) throws ParseException {
+            FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKB, false);
+            return formatUtils.readGeometry(wkbString);
+        }
+    }
+}
\ No newline at end of file
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
new file mode 100644
index 0000000..8bd4b89
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Functions.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.core.utils.GeomUtils;
+import org.geotools.geometry.jts.JTS;
+import org.geotools.referencing.CRS;
+import org.locationtech.jts.geom.Geometry;
+import org.opengis.referencing.FactoryException;
+import org.opengis.referencing.crs.CoordinateReferenceSystem;
+import org.opengis.referencing.operation.MathTransform;
+import org.opengis.referencing.operation.TransformException;
+
+public class Functions {
+    public static class ST_Buffer extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("RAW") Object o, @DataTypeHint("Double") Double radius) {
+            Geometry geom = (Geometry) o;
+            return geom.buffer(radius);
+        }
+    }
+
+    public static class ST_Distance extends ScalarFunction {
+        @DataTypeHint("Double")
+        public Double eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            return geom1.distance(geom2);
+        }
+    }
+
+    public static class ST_Transform extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("RAW") Object o, @DataTypeHint("String") String sourceCRS, @DataTypeHint("String") String targetCRS) {
+            Geometry geom = (Geometry) o;
+            try {
+                CoordinateReferenceSystem sourceCRScode = CRS.decode(sourceCRS);
+                CoordinateReferenceSystem targetCRScode = CRS.decode(targetCRS);
+                MathTransform transform = CRS.findMathTransform(sourceCRScode, targetCRScode);
+                geom = JTS.transform(geom, transform);
+            } catch (FactoryException | TransformException e) {
+                e.printStackTrace();
+            }
+            return geom;
+        }
+    }
+
+    public static class ST_FlipCoordinates extends ScalarFunction {
+        @DataTypeHint("RAW")
+        public Geometry eval(@DataTypeHint("RAW") Object o) {
+            Geometry geom = (Geometry) o;
+            GeomUtils.flipCoordinates(geom);
+            return geom;
+        }
+    }
+}
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/JoinOperators.java b/flink/src/main/java/org/apache/sedona/flink/expressions/JoinOperators.java
new file mode 100644
index 0000000..28fd759
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/JoinOperators.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.types.Row;
+import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
+import org.locationtech.jts.geom.Geometry;
+
+import java.util.Iterator;
+import java.util.Set;
+
+public class JoinOperators {
+
+    @FunctionHint(output = @DataTypeHint("ROW<_joinKey INT NOT NULL>"))
+    public static class ST_Keys extends TableFunction {
+        PartitioningUtils partitioner;
+        public ST_Keys(PartitioningUtils partitioner){
+            this.partitioner = partitioner;
+        }
+        public void eval(@DataTypeHint("RAW") Object o) {
+            Geometry geom = (Geometry) o;
+            Set<Integer> keys = partitioner.getKeys(geom);
+            Iterator<Integer> it = keys.iterator();
+            while (it.hasNext()) {
+                collect(Row.of(it.next()));
+            }
+        }
+    }
+
+    public static class ST_Key1 extends ScalarFunction {
+        PartitioningUtils partitioner;
+        public ST_Key1(PartitioningUtils partitioner){
+            this.partitioner = partitioner;
+        }
+        public Integer eval(@DataTypeHint("RAW") Object o) {
+            Geometry geom = (Geometry) o;
+            Set<Integer> keys = partitioner.getKeys(geom);
+            return keys.iterator().next();
+        }
+    }
+}
diff --git a/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
new file mode 100644
index 0000000..c1a8e13
--- /dev/null
+++ b/flink/src/main/java/org/apache/sedona/flink/expressions/Predicates.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink.expressions;
+
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.sedona.core.joinJudgement.JudgementHelper;
+import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
+import org.apache.sedona.core.utils.HalfOpenRectangle;
+import org.locationtech.jts.geom.Envelope;
+import org.locationtech.jts.geom.Geometry;
+
+import java.util.List;
+import java.util.Objects;
+
+public class Predicates {
+    public static class ST_Intersects extends ScalarFunction {
+        private List<Envelope> grids;
+
+        /**
+         * Constructor for duplicate removal
+         */
+        public ST_Intersects(PartitioningUtils partitioner) {
+            grids = partitioner.fetchLeafZones();
+        }
+
+        /**
+         * Constructor for relation checking without duplicate removal
+         */
+        public ST_Intersects() {
+        }
+
+        @DataTypeHint("Boolean")
+        public Boolean eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            return geom1.intersects(geom2);
+        }
+
+        /**
+         * Check spatial relation with duplicates removal
+         * @param key
+         * @param o1
+         * @param o2
+         * @return
+         */
+        @DataTypeHint("Boolean")
+        public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+            Objects.requireNonNull(grids, "This predicate has to be initialized by a partitioner.");
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            HalfOpenRectangle halfOpenRectangle = new HalfOpenRectangle(grids.get(key));
+            return JudgementHelper.match(geom1, geom2, halfOpenRectangle, true);
+        }
+    }
+
+    public static class ST_Contains extends ScalarFunction {
+        private List<Envelope> grids;
+
+        /**
+         * Constructor for duplicate removal
+         */
+        public ST_Contains(PartitioningUtils partitioner) {
+            grids = partitioner.fetchLeafZones();
+        }
+
+        /**
+         * Constructor for relation checking without duplicate removal
+         */
+        public ST_Contains() {
+        }
+
+        @DataTypeHint("Boolean")
+        public Boolean eval(@DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            return geom1.covers(geom2);
+        }
+
+        /**
+         * Check spatial relation with duplicates removal
+         * @param key
+         * @param o1
+         * @param o2
+         * @return
+         */
+        @DataTypeHint("Boolean")
+        public Boolean eval(@DataTypeHint("INT") Integer key, @DataTypeHint("RAW") Object o1, @DataTypeHint("RAW") Object o2) {
+            Objects.requireNonNull(grids, "This predicate has to be initialized by a partitioner.");
+            Geometry geom1 = (Geometry) o1;
+            Geometry geom2 = (Geometry) o2;
+            HalfOpenRectangle halfOpenRectangle = new HalfOpenRectangle(grids.get(key));
+            return JudgementHelper.match(geom1, geom2, halfOpenRectangle, false);
+        }
+    }
+}
diff --git a/flink/src/test/java/org.apache.sedona/flink/ConstructorTest.java b/flink/src/test/java/org.apache.sedona/flink/ConstructorTest.java
new file mode 100644
index 0000000..af14c99
--- /dev/null
+++ b/flink/src/test/java/org.apache.sedona/flink/ConstructorTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.types.Row;
+import org.apache.sedona.flink.expressions.Constructors;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.junit.Assert.assertEquals;
+
+public class ConstructorTest extends TestBase{
+
+    @BeforeClass
+    public static void onceExecutedBeforeAll() {
+        initialize();
+    }
+
+    @Test
+    public void testPointFromText() {
+        List<Row> data = createPointWKT(testDataSize);
+        Row result = last(createPointTable(testDataSize));
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
+    @Test
+    public void testPolygonFromText() {
+        List<Row> data = createPolygonWKT(testDataSize);
+        Row result = last(createPolygonTable(testDataSize));
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
+    @Test
+    public void testGeomFromWKT() {
+        List<Row> data = createPolygonWKT(testDataSize);
+        Table wktTable = createTextTable(data, polygonColNames);
+        Table geomTable = wktTable.select(call(Constructors.ST_GeomFromWKT.class.getSimpleName(),
+                $(polygonColNames[0])).as(polygonColNames[0]),
+                $(polygonColNames[1]));
+        Row result = last(geomTable);
+        assertEquals(result.toString(), data.get(data.size() - 1).toString());
+    }
+
+    @Test
+    public void testPolygonFromEnvelope() {
+        Double minX = 1.0;
+        Double minY = 100.0;
+        Double maxX = 2.0;
+        Double maxY = 200.0;
+        Coordinate[] coordinates = new Coordinate[5];
+        coordinates[0] = new Coordinate(minX, minY);
+        coordinates[1] = new Coordinate(minX, maxY);
+        coordinates[2] = new Coordinate(maxX, maxY);
+        coordinates[3] = new Coordinate(maxX, minY);
+        coordinates[4] = coordinates[0];
+        GeometryFactory geometryFactory = new GeometryFactory();
+        Geometry geom = geometryFactory.createPolygon(coordinates);
+        assertEquals(geom.toString(), last(tableEnv.sqlQuery("SELECT ST_PolygonFromEnvelope(1, 100, 2, 200)"))
+                .getField(0).toString());
+        assertEquals(geom.toString(), last(tableEnv.sqlQuery("SELECT ST_PolygonFromEnvelope(1.0, 100.0, 2.0, 200.0)"))
+                .getField(0).toString());
+
+    }
+}
diff --git a/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
new file mode 100644
index 0000000..c8074bb
--- /dev/null
+++ b/flink/src/test/java/org.apache.sedona/flink/FunctionTest.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.table.api.Table;
+import org.apache.sedona.flink.expressions.Functions;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.locationtech.jts.geom.Geometry;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+import static org.junit.Assert.assertEquals;
+
+public class FunctionTest extends TestBase{
+    @BeforeClass
+    public static void onceExecutedBeforeAll() {
+        initialize();
+    }
+
+    @Test
+    public void testFlipCoordinates() {
+        Table pointTable = createPointTable_real(testDataSize);
+        Table flippedTable = pointTable.select(call(Functions.ST_FlipCoordinates.class.getSimpleName(), $(pointColNames[0])));
+        Geometry result = (Geometry) first(flippedTable).getField(0);
+        assertEquals("POINT (-118 32)", result.toString());
+    }
+
+    @Test
+    public void testTransform() {
+        Table pointTable = createPointTable_real(testDataSize);
+        Table transformedTable = pointTable.select(call(Functions.ST_Transform.class.getSimpleName(), $(pointColNames[0])
+                , "epsg:4326", "epsg:3857"));
+        String result = first(transformedTable).getField(0).toString();
+        assertEquals("POINT (-13135699.91360628 3763310.6271446524)", result);
+    }
+
+    @Test
+    public void testDistance() {
+        Table pointTable = createPointTable(testDataSize);
+        pointTable = pointTable.select(call(Functions.ST_Distance.class.getSimpleName(), $(pointColNames[0])
+                , call("ST_GeomFromWKT", "POINT (0 0)")));
+        assertEquals(0.0, first(pointTable).getField(0));
+    }
+}
diff --git a/flink/src/test/java/org.apache.sedona/flink/PredicateTest.java b/flink/src/test/java/org.apache.sedona/flink/PredicateTest.java
new file mode 100644
index 0000000..0b3594a
--- /dev/null
+++ b/flink/src/test/java/org.apache.sedona/flink/PredicateTest.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.table.api.Table;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PredicateTest extends TestBase{
+    @BeforeClass
+    public static void onceExecutedBeforeAll() {
+        initialize();
+    }
+
+    @Test
+    public void testIntersects() {
+        Table pointTable = createPointTable(testDataSize);
+        String polygon = createPolygonWKT(testDataSize).get(0).getField(0).toString();
+        String expr = "ST_Intersects(ST_GeomFromWkt('" + polygon + "'), geom_point)";
+        Table result = pointTable.filter(expr);
+        assertEquals(count(result), 1);
+    }
+}
diff --git a/flink/src/test/java/org.apache.sedona/flink/TestBase.java b/flink/src/test/java/org.apache.sedona/flink/TestBase.java
new file mode 100644
index 0000000..372f4ee
--- /dev/null
+++ b/flink/src/test/java/org.apache.sedona/flink/TestBase.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sedona.flink;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.sedona.core.enums.GridType;
+import org.apache.sedona.core.spatialPartitioning.PartitioningUtils;
+import org.apache.sedona.flink.expressions.Constructors;
+import org.locationtech.jts.geom.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+public class TestBase {
+    protected static StreamExecutionEnvironment env;
+    protected static StreamTableEnvironment tableEnv;
+    static int testDataSize = 1000;
+    static String[] pointColNames = {"geom_point", "name_point"};
+    static String[] polygonColNames = {"geom_polygon", "name_polygon"};
+    static String pointTableName = "point_table";
+    static String polygonTableName = "polygon_table";
+
+    public void setTestDataSize(int testDataSize) {
+        this.testDataSize = testDataSize;
+    }
+
+    static void initialize() {
+        initialize(false);
+    }
+
+    static void initialize(boolean enableWebUI) {
+        Logger.getLogger("org").setLevel(Level.WARN);
+        env = enableWebUI? StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration()):
+                StreamExecutionEnvironment.getExecutionEnvironment();
+        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+        tableEnv = StreamTableEnvironment.create(env, settings);
+        SedonaFlinkRegistrator.registerType(env);
+        SedonaFlinkRegistrator.registerFunc(tableEnv);
+    }
+
+    static List<Row> createPointText(int size){
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create a numer of points (1, 1) (2, 2) ...
+            data.add(Row.of(i + "," + i, "point" + i));
+        }
+        return data;
+    }
+
+    static List<Point> creatPoint(int size){
+        List<Point> data = new ArrayList<>();
+        GeometryFactory geomFact = new GeometryFactory();
+        for (int i = 0; i < size; i++) {
+            // Create a numer of points (1, 1) (2, 2) ...
+            data.add(geomFact.createPoint(new Coordinate(i, i)));
+        }
+        return data;
+    }
+
+    // Simulate some points in the US
+    static List<Row> createPointText_real(int size){
+        List<Row> data = new ArrayList<>();
+        for (double i = 0; i < 10.0; i = i + 10.0/size) {
+            double x = 32.0 + i;
+            double y = -118.0 + i;
+            data.add(Row.of(x + "," + y, "point"));
+        }
+        return data;
+    }
+
+    static List<Row> createPointWKT(int size){
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create a numer of points (1, 1) (2, 2) ...
+            data.add(Row.of("POINT (" + i + " " + i +")", "point" + i));
+        }
+        return data;
+    }
+
+    static List<Row> createPolygonText(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+            List<String> polygon = new ArrayList<>();
+            polygon.add(minX);polygon.add(minY);
+            polygon.add(minX);polygon.add(maxY);
+            polygon.add(maxX);polygon.add(maxY);
+            polygon.add(maxX);polygon.add(minY);
+            polygon.add(minX);polygon.add(minY);
+            data.add(Row.of(String.join(",", polygon), "polygon" + i));
+        }
+        return data;
+    }
+
+    static List<Row> createPolygonWKT(int size) {
+        List<Row> data = new ArrayList<>();
+        for (int i = 0; i < size; i++) {
+            // Create polygons each of which only has 1 match in points
+            // Each polygon is an envelope like (-0.5, -0.5, 0.5, 0.5)
+            String minX = String.valueOf(i - 0.5);
+            String minY = String.valueOf(i - 0.5);
+            String maxX = String.valueOf(i + 0.5);
+            String maxY = String.valueOf(i + 0.5);
+            List<String> polygon = new ArrayList<>();
+            polygon.add(minX + " " + minY);
+            polygon.add(minX + " " + maxY);
+            polygon.add(maxX + " " + maxY);
+            polygon.add(maxX + " " + minY);
+            polygon.add(minX + " " + minY);
+            data.add(Row.of("POLYGON ((" + String.join(", ", polygon) + "))", "polygon" + i));
+        }
+        return data;
+    }
+
+    static Table createTextTable(List<Row> data, String[] colNames){
+        TypeInformation<?>[] colTypes = {
+                BasicTypeInfo.STRING_TYPE_INFO,
+                BasicTypeInfo.STRING_TYPE_INFO};
+        RowTypeInfo typeInfo = new RowTypeInfo(colTypes, colNames);
+        DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+        return tableEnv.fromDataStream(ds, $(colNames[0]), $(colNames[1]));
+    }
+
+    static Table createPointTextTable(int size){
+        return createTextTable(createPointText(size), pointColNames);
+    }
+
+    static Table createPointTextTable_real(int size){
+        return createTextTable(createPointText_real(size), pointColNames);
+    }
+
+    static Table createPolygonTextTable(int size) {
+        return createTextTable(createPolygonText(size), polygonColNames);
+    }
+
+    static Table createPointTable(int size){
+        return createPointTextTable(size)
+                .select(call(Constructors.ST_PointFromText.class.getSimpleName(),
+                        $(pointColNames[0])).as(pointColNames[0]),
+                        $(pointColNames[1]));
+    }
+
+    static Table createPointTable_real(int size){
+        return createPointTextTable_real(size)
+                .select(call(Constructors.ST_PointFromText.class.getSimpleName(),
+                        $(pointColNames[0])).as(pointColNames[0]),
+                        $(pointColNames[1]));
+    }
+
+    Table createPolygonTable(int size) {
+        return createPolygonTextTable(size)
+                .select(call(Constructors.ST_PolygonFromText.class.getSimpleName(),
+                        $(polygonColNames[0])).as(polygonColNames[0]),
+                        $(polygonColNames[1]));
+    }
+
+    /**
+     * Get the iterator of the flink
+     * @param table
+     * @return
+     */
+    static CloseableIterator<Row> iterate(Table table) {
+        return table.execute().collect();
+    }
+
+    /**
+     * Iterate to the last row of the flink
+     * @param table
+     * @return
+     */
+    static Row last(Table table) {
+        CloseableIterator<Row> it = iterate(table);
+        Row lastRow = Row.of(-1L);
+        while (it.hasNext()) lastRow = it.next();
+        return lastRow;
+    }
+
+    static Row first(Table table) {
+        CloseableIterator<Row> it = iterate(table);
+        assert(it.hasNext());
+        Row firstRow = it.next();
+        return firstRow;
+    }
+
+    static long count(Table table) {
+        CloseableIterator<Row> it = iterate(table);
+        long count = 0;
+        while (it.hasNext()) {
+            count++;
+            it.next();
+        }
+        return count;
+    }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 55a302c..04fb294 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,6 +40,7 @@
         <module>sql</module>
         <module>viz</module>
         <module>python-adapter</module>
+        <module>flink</module>
     </modules>
     <licenses>
         <license>