You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/03/06 09:25:44 UTC

[incubator-sedona] branch flink-docs updated: Add Flink tutorial

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch flink-docs
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/flink-docs by this push:
     new c222a36  Add Flink tutorial
c222a36 is described below

commit c222a3671dd023065c2ceeaf2aa61351d6735a94
Author: Jia Yu <ji...@apache.org>
AuthorDate: Sun Mar 6 01:25:38 2022 -0800

    Add Flink tutorial
---
 docs/api/flink/Overview.md        |   2 +-
 docs/setup/flink/install-scala.md |   2 +-
 docs/tutorial/flink/sql.md        | 442 +++++++++++++++++++++++---------------
 3 files changed, 273 insertions(+), 173 deletions(-)

diff --git a/docs/api/flink/Overview.md b/docs/api/flink/Overview.md
index 56fb41d..07b5fcc 100644
--- a/docs/api/flink/Overview.md
+++ b/docs/api/flink/Overview.md
@@ -1,6 +1,6 @@
 # Introduction
 
-SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the programming guide: [Sedona with Flink application](../../tutorial/flink/sql.md).
+SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. Please read the programming guide: [Sedona with Flink SQL app](../../tutorial/flink/sql.md).
 
 Sedona includes SQL operators as follows.
 
diff --git a/docs/setup/flink/install-scala.md b/docs/setup/flink/install-scala.md
index 8e7af1c..cc4f4a9 100644
--- a/docs/setup/flink/install-scala.md
+++ b/docs/setup/flink/install-scala.md
@@ -5,7 +5,7 @@ Then you can create a self-contained Scala / Java project. A self-contained proj
 To use Sedona in your self-contained Flink project, you just need to add Sedona as a dependency in your POM.xml or build.sbt.
 
 1. To add Sedona as dependencies, please read [Sedona Maven Central coordinates](maven-coordinates.md)
-2. Use Sedona Template project to start: [Sedona Template Project](/tutorial/demo/)
+2. Read [Sedona Flink guide](/tutorial/flink/sql) and use Sedona Template project to start: [Sedona Template Project](/tutorial/demo/)
 3. Compile your project using Maven. Make sure you obtain the fat jar which packages all dependencies.
 4. Submit your compiled fat jar to Flink cluster. Make sure you are in the root folder of Flink distribution. Then run the following command:
 ```
diff --git a/docs/tutorial/flink/sql.md b/docs/tutorial/flink/sql.md
index 2765b9f..bef96c8 100644
--- a/docs/tutorial/flink/sql.md
+++ b/docs/tutorial/flink/sql.md
@@ -1,141 +1,111 @@
-The page outlines the steps to manage spatial data using SedonaSQL. ==The example code is written in Scala but also works for Java==.
+The page outlines the steps to manage spatial data using SedonaSQL. ==The example code is written in Java but also works for Scala==.
 
 SedonaSQL supports SQL/MM Part3 Spatial SQL Standard. It includes four kinds of SQL operators as follows. All these operators can be directly called through:
-```Scala
-var myDataFrame = sparkSession.sql("YOUR_SQL")
+```Java
+Table myTable = tableEnv.sqlQuery("YOUR_SQL")
 ```
 
-Detailed SedonaSQL APIs are available here: [SedonaSQL API](../api/sql/Overview.md)
+Detailed SedonaSQL APIs are available here: [SedonaSQL API](/api/flink/Overview)
 
 ## Set up dependencies
 
-1. Read [Sedona Maven Central coordinates](../setup/maven-coordinates.md)
-2. Select ==the minimum dependencies==: Add [Apache Spark core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11), [Apache SparkSQL](https://mvnrepository.com/artifact/org.apache.spark/spark-sql), Sedona-core and Sedona-SQL
-3. Add the dependencies in build.sbt or pom.xml.
+1. Read [Sedona Maven Central coordinates](/setup/maven-coordinates)
+2. Add Sedona dependencies in build.sbt or pom.xml.
+3. Add [Flink dependencies](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/) in build.sbt or pom.xml.
 
-!!!note
-	To enjoy the full functions of Sedona, we suggest you include ==the full dependencies==: [Apache Spark core](https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11), [Apache SparkSQL](https://mvnrepository.com/artifact/org.apache.spark/spark-sql), Sedona-core, Sedona-SQL, Sedona-Viz. Please see [SQL example project](/tutorial/demo/)
-
-
-## Initiate SparkSession
-Use the following code to initiate your SparkSession at the beginning:
-```Scala
-var sparkSession = SparkSession.builder()
-.master("local[*]") // Delete this if run in cluster mode
-.appName("readTestScala") // Change this to a proper name
-// Enable Sedona custom Kryo serializer
-.config("spark.serializer", classOf[KryoSerializer].getName) // org.apache.spark.serializer.KryoSerializer
-.config("spark.kryo.registrator", classOf[SedonaKryoRegistrator].getName)
-.getOrCreate() // org.apache.sedona.core.serde.SedonaKryoRegistrator
-```
-
-!!!warning
-	Sedona has a suite of well-written geometry and index serializers. Forgetting to enable these serializers will lead to high memory consumption.
-
-If you add ==the Sedona full dependencies== as suggested above, please use the following two lines to enable Sedona Kryo serializer instead:
-```Scala
-.config("spark.serializer", classOf[KryoSerializer].getName) // org.apache.spark.serializer.KryoSerializer
-.config("spark.kryo.registrator", classOf[SedonaVizKryoRegistrator].getName) // org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator
+## Initiate Stream Environment
+Use the following code to initiate your `StreamExecutionEnvironment` at the beginning:
+```Java
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
+EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
+StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
 ```
 
 ## Register SedonaSQL
 
-Add the following line after your SparkSession declaration
+Add the following line after your `StreamExecutionEnvironment` and `StreamTableEnvironment` declaration
 
-```Scala
-SedonaSQLRegistrator.registerAll(sparkSession)
+```Java
+SedonaFlinkRegistrator.registerType(env);
+SedonaFlinkRegistrator.registerFunc(tableEnv);
 ```
 
-This function will register Sedona User Defined Type, User Defined Function and optimized join query strategy.
-
-You can also register everything by passing `--conf spark.sql.extensions=org.apache.sedona.sql.SedonaSqlExtensions` to `spark-submit` or `spark-shell`.
-
-## Load data from files
-
-Assume we have a WKT file, namely `usa-county.tsv`, at Path `/Download/usa-county.tsv` as follows:
+!!!warning
+	Sedona has a suite of well-written geometry and index serializers. Forgetting to enable these serializers will lead to high memory consumption.
 
-```
-POLYGON (..., ...)	Cuming County	
-POLYGON (..., ...)	Wahkiakum County
-POLYGON (..., ...)	De Baca County
-POLYGON (..., ...)	Lancaster County
-```
-The file may have many other columns.
+This function will register Sedona User Defined Type and User Defined Function
 
-Use the following code to load the data and create a raw DataFrame:
+## Create a Geometry type column
 
-```Scala
-var rawDf = sparkSession.read.format("csv").option("delimiter", "\t").option("header", "false").load("/Download/usa-county.tsv")
-rawDf.createOrReplaceTempView("rawdf")
-rawDf.show()
-```
+All geometrical operations in SedonaSQL are on Geometry type objects. Therefore, before any kind of queries, you need to create a Geometry type column on a DataFrame.
 
-The output will be like this:
+Assume you have a Flink Table `tbl` like this:
 
 ```
-|                 _c0|_c1|_c2|     _c3|  _c4|        _c5|                 _c6|_c7|_c8|  _c9|_c10| _c11|_c12|_c13|      _c14|    _c15|       _c16|        _c17|
-+--------------------+---+---+--------+-----+-----------+--------------------+---+---+-----+----+-----+----+----+----------+--------+-----------+------------+
-|POLYGON ((-97.019...| 31|039|00835841|31039|     Cuming|       Cuming County| 06| H1|G4020|null| null|null|   A|1477895811|10447360|+41.9158651|-096.7885168|
-|POLYGON ((-123.43...| 53|069|01513275|53069|  Wahkiakum|    Wahkiakum County| 06| H1|G4020|null| null|null|   A| 682138871|61658258|+46.2946377|-123.4244583|
-|POLYGON ((-104.56...| 35|011|00933054|35011|    De Baca|      De Baca County| 06| H1|G4020|null| null|null|   A|6015539696|29159492|+34.3592729|-104.3686961|
-|POLYGON ((-96.910...| 31|109|00835876|31109|  Lancaster|    Lancaster County| 06| H1|G4020| 339|30700|null|   A|2169240202|22877180|+40.7835474|-096.6886584|
++----+--------------------------------+--------------------------------+
+| op |                   geom_polygon |                   name_polygon |
++----+--------------------------------+--------------------------------+
+| +I | POLYGON ((-0.5 -0.5, -0.5 0... |                       polygon0 |
+| +I | POLYGON ((0.5 0.5, 0.5 1.5,... |                       polygon1 |
+| +I | POLYGON ((1.5 1.5, 1.5 2.5,... |                       polygon2 |
+| +I | POLYGON ((2.5 2.5, 2.5 3.5,... |                       polygon3 |
+| +I | POLYGON ((3.5 3.5, 3.5 4.5,... |                       polygon4 |
+| +I | POLYGON ((4.5 4.5, 4.5 5.5,... |                       polygon5 |
+| +I | POLYGON ((5.5 5.5, 5.5 6.5,... |                       polygon6 |
+| +I | POLYGON ((6.5 6.5, 6.5 7.5,... |                       polygon7 |
+| +I | POLYGON ((7.5 7.5, 7.5 8.5,... |                       polygon8 |
+| +I | POLYGON ((8.5 8.5, 8.5 9.5,... |                       polygon9 |
++----+--------------------------------+--------------------------------+
+10 rows in set
 ```
 
-## Create a Geometry type column
-
-All geometrical operations in SedonaSQL are on Geometry type objects. Therefore, before any kind of queries, you need to create a Geometry type column on a DataFrame.
-
+You can create a Table with a Geometry type column as follows:
 
-```Scala
-var spatialDf = sparkSession.sql(
-  """
-    |SELECT ST_GeomFromWKT(_c0) AS countyshape, _c1, _c2
-    |FROM rawdf
-  """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+tableEnv.createTemporaryView("myTable", tbl)
+Table geomTbl = tableEnv.sql("SELECT ST_GeomFromWKT(geom_polygon) as geom_polygon, name_polygon FROM myTable")
+geomTbl.execute().print()
 ```
 
-You can select many other attributes to compose this `spatialdDf`. The output will be something like this:
+The output will be:
 
 ```
-|                 countyshape|_c1|_c2|     _c3|  _c4|        _c5|                 _c6|_c7|_c8|  _c9|_c10| _c11|_c12|_c13|      _c14|    _c15|       _c16|        _c17|
-+--------------------+---+---+--------+-----+-----------+--------------------+---+---+-----+----+-----+----+----+----------+--------+-----------+------------+
-|POLYGON ((-97.019...| 31|039|00835841|31039|     Cuming|       Cuming County| 06| H1|G4020|null| null|null|   A|1477895811|10447360|+41.9158651|-096.7885168|
-|POLYGON ((-123.43...| 53|069|01513275|53069|  Wahkiakum|    Wahkiakum County| 06| H1|G4020|null| null|null|   A| 682138871|61658258|+46.2946377|-123.4244583|
-|POLYGON ((-104.56...| 35|011|00933054|35011|    De Baca|      De Baca County| 06| H1|G4020|null| null|null|   A|6015539696|29159492|+34.3592729|-104.3686961|
-|POLYGON ((-96.910...| 31|109|00835876|31109|  Lancaster|    Lancaster County| 06| H1|G4020| 339|30700|null|   A|2169240202|22877180|+40.7835474|-096.6886584|
++----+--------------------------------+--------------------------------+
+| op |                   geom_polygon |                   name_polygon |
++----+--------------------------------+--------------------------------+
+| +I | POLYGON ((-0.5 -0.5, -0.5 0... |                       polygon0 |
+| +I | POLYGON ((0.5 0.5, 0.5 1.5,... |                       polygon1 |
+| +I | POLYGON ((1.5 1.5, 1.5 2.5,... |                       polygon2 |
+| +I | POLYGON ((2.5 2.5, 2.5 3.5,... |                       polygon3 |
+| +I | POLYGON ((3.5 3.5, 3.5 4.5,... |                       polygon4 |
+| +I | POLYGON ((4.5 4.5, 4.5 5.5,... |                       polygon5 |
+| +I | POLYGON ((5.5 5.5, 5.5 6.5,... |                       polygon6 |
+| +I | POLYGON ((6.5 6.5, 6.5 7.5,... |                       polygon7 |
+| +I | POLYGON ((7.5 7.5, 7.5 8.5,... |                       polygon8 |
+| +I | POLYGON ((8.5 8.5, 8.5 9.5,... |                       polygon9 |
++----+--------------------------------+--------------------------------+
+10 rows in set
 ```
 
-Although it looks same with the input, but actually the type of column countyshape has been changed to ==Geometry== type.
+Although it looks same with the input, actually the type of column geom_polygon has been changed to ==Geometry== type.
 
 To verify this, use the following code to print the schema of the DataFrame:
 
-```Scala
-spatialDf.printSchema()
+```Java
+geomTbl.printSchema()
 ```
 
 The output will be like this:
 
 ```
-root
- |-- countyshape: geometry (nullable = false)
- |-- _c1: string (nullable = true)
- |-- _c2: string (nullable = true)
- |-- _c3: string (nullable = true)
- |-- _c4: string (nullable = true)
- |-- _c5: string (nullable = true)
- |-- _c6: string (nullable = true)
- |-- _c7: string (nullable = true)
+(
+  `geom_polygon` RAW('org.locationtech.jts.geom.Geometry', '...'),
+  `name_polygon` STRING
+)
 ```
 
 !!!note
-	SedonaSQL provides lots of functions to create a Geometry column, please read [SedonaSQL constructor API](../api/sql/Constructor.md).
-	
-## Load Shapefile and GeoJSON
-
-Shapefile and GeoJSON must be loaded by SpatialRDD and converted to DataFrame using Adapter. Please read [Load SpatialRDD](../rdd/#create-a-generic-spatialrdd) and [DataFrame <-> RDD](#convert-between-dataframe-and-spatialrdd).
-
+	SedonaSQL provides lots of functions to create a Geometry column, please read [SedonaSQL constructor API](/api/flink/Constructor).
 
 ## Transform the Coordinate Reference System
 
@@ -143,14 +113,9 @@ Sedona doesn't control the coordinate unit (degree-based or meter-based) of all
 
 To convert Coordinate Reference System of the Geometry column created before, use the following code:
 
-```Scala
-spatialDf = sparkSession.sql(
-  """
-    |SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS newcountyshape, _c1, _c2, _c3, _c4, _c5, _c6, _c7
-    |FROM spatialdf
-  """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+Table geomTbl3857 = tableEnv.sqlQuery("SELECT ST_Transform(countyshape, "epsg:4326", "epsg:3857") AS geom_polygon, name_polygon FROM myTable")
+geomTbl3857.execute().print()
 ```
 
 The first EPSG code EPSG:4326 in `ST_Transform` is the source CRS of the geometries. It is WGS84, the most common degree-based CRS.
@@ -159,17 +124,46 @@ The second EPSG code EPSG:3857 in `ST_Transform` is the target CRS of the geomet
 
 This `ST_Transform` transform the CRS of these geomtries from EPSG:4326 to EPSG:3857. The details CRS information can be found on [EPSG.io](https://epsg.io/.)
 
-The coordinates of polygons have been changed. The output will be like this:
+!!!note
+	Read [SedonaSQL ST_Transform API](/api/flink/Function/#st_transform) to learn different spatial query predicates.
+
+For example, a Table that has coordinates in the US will become like this.
 
+Before the transformation:
+```
++----+--------------------------------+--------------------------------+
+| op |                     geom_point |                     name_point |
++----+--------------------------------+--------------------------------+
+| +I |                POINT (32 -118) |                          point |
+| +I |                POINT (33 -117) |                          point |
+| +I |                POINT (34 -116) |                          point |
+| +I |                POINT (35 -115) |                          point |
+| +I |                POINT (36 -114) |                          point |
+| +I |                POINT (37 -113) |                          point |
+| +I |                POINT (38 -112) |                          point |
+| +I |                POINT (39 -111) |                          point |
+| +I |                POINT (40 -110) |                          point |
+| +I |                POINT (41 -109) |                          point |
++----+--------------------------------+--------------------------------+
 ```
-+--------------------+---+---+--------+-----+-----------+--------------------+---+
-|      newcountyshape|_c1|_c2|     _c3|  _c4|        _c5|                 _c6|_c7|
-+--------------------+---+---+--------+-----+-----------+--------------------+---+
-|POLYGON ((-108001...| 31|039|00835841|31039|     Cuming|       Cuming County| 06|
-|POLYGON ((-137408...| 53|069|01513275|53069|  Wahkiakum|    Wahkiakum County| 06|
-|POLYGON ((-116403...| 35|011|00933054|35011|    De Baca|      De Baca County| 06|
-|POLYGON ((-107880...| 31|109|00835876|31109|  Lancaster|    Lancaster County| 06|
 
+After the transformation:
+
+```
++----+--------------------------------+--------------------------------+
+| op |                            _c0 |                     name_point |
++----+--------------------------------+--------------------------------+
+| +I | POINT (-13135699.91360628 3... |                          point |
+| +I | POINT (-13024380.422813008 ... |                          point |
+| +I | POINT (-12913060.932019735 ... |                          point |
+| +I | POINT (-12801741.44122646 4... |                          point |
+| +I | POINT (-12690421.950433187 ... |                          point |
+| +I | POINT (-12579102.459639912 ... |                          point |
+| +I | POINT (-12467782.96884664 4... |                          point |
+| +I | POINT (-12356463.478053367 ... |                          point |
+| +I | POINT (-12245143.987260092 ... |                          point |
+| +I | POINT (-12133824.496466817 ... |                          point |
++----+--------------------------------+--------------------------------+
 ```
 
 
@@ -179,105 +173,211 @@ After creating a Geometry type column, you are able to run spatial queries.
 
 ### Range query
 
-Use ==ST_Contains==, ==ST_Intersects==, ==ST_Within== to run a range query over a single column.
+Use ==ST_Contains==, ==ST_Intersects== and so on to run a range query over a single column.
 
 The following example finds all counties that are within the given polygon:
 
-```Scala
-spatialDf = sparkSession.sql(
-  """
-    |SELECT *
-    |FROM spatialdf
-    |WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape)
-  """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+geomTable = tableEnv.sqlQuery(
+  "
+    SELECT *
+    FROM spatialdf
+    WHERE ST_Contains (ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape)
+  ")
+geomTable.execute().print()
 ```
 
 !!!note
-	Read [SedonaSQL constructor API](../api/sql/Constructor.md) to learn how to create a Geometry type query window
+	Read [SedonaSQL Predicate API](/api/flink/Predicate) to learn different spatial query predicates.
+	
 ### KNN query
 
 Use ==ST_Distance== to calculate the distance and rank the distance.
 
 The following code returns the 5 nearest neighbor of the given polygon.
 
-```Scala
-spatialDf = sparkSession.sql(
-  """
-    |SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance
-    |FROM spatialdf
-    |ORDER BY distance DESC
-    |LIMIT 5
-  """.stripMargin)
-spatialDf.createOrReplaceTempView("spatialdf")
-spatialDf.show()
+```Java
+geomTable = tableEnv.sqlQuery(
+  "
+    SELECT countyname, ST_Distance(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), newcountyshape) AS distance
+    FROM geomTable
+    ORDER BY distance DESC
+    LIMIT 5
+  ")
+geomTable.execute().print()
 ```
 
-### Join query
+## Convert Spatial Table to Spatial DataStream
 
-The details of a join query is available here [Join query](../api/sql/Optimizer.md).
+### Get DataStream
 
-### Other queries
+Use TableEnv's toDataStream function
 
-There are lots of other functions can be combined with these queries. Please read [SedonaSQL functions](../api/sql/Function.md) and [SedonaSQL aggregate functions](../api/sql/AggregateFunction.md).
+```Java
+DataStream<Row> geomStream = tableEnv.toDataStream(geomTable)
+```
 
-## Save to permanent storage
+### Retrieve Geometries
 
-To save a Spatial DataFrame to some permanent storage such as Hive tables and HDFS, you can simply convert each geometry in the Geometry type column back to a plain String and save the plain DataFrame to wherever you want.
+Then get the Geometry from each Row object using Map
 
+```Java
+import org.locationtech.jts.geom.Geometry;
 
-Use the following code to convert the Geometry column in a DataFrame back to a WKT string column:
-```Scala
-var stringDf = sparkSession.sql(
-  """
-    |SELECT ST_AsText(countyshape)
-    |FROM polygondf
-  """.stripMargin)
+DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row, Geometry>() {
+            @Override
+            public Geometry map(Row value) throws Exception {
+                return (Geometry) value.getField(0);
+            }
+        });
+geometries.print();
 ```
 
-!!!note
-	ST_AsGeoJSON is also available. We would like to invite you to contribute more functions
+The output will be
 
+```
+14> POLYGON ((1.5 1.5, 1.5 2.5, 2.5 2.5, 2.5 1.5, 1.5 1.5))
+2> POLYGON ((5.5 5.5, 5.5 6.5, 6.5 6.5, 6.5 5.5, 5.5 5.5))
+5> POLYGON ((8.5 8.5, 8.5 9.5, 9.5 9.5, 9.5 8.5, 8.5 8.5))
+16> POLYGON ((3.5 3.5, 3.5 4.5, 4.5 4.5, 4.5 3.5, 3.5 3.5))
+12> POLYGON ((-0.5 -0.5, -0.5 0.5, 0.5 0.5, 0.5 -0.5, -0.5 -0.5))
+13> POLYGON ((0.5 0.5, 0.5 1.5, 1.5 1.5, 1.5 0.5, 0.5 0.5))
+15> POLYGON ((2.5 2.5, 2.5 3.5, 3.5 3.5, 3.5 2.5, 2.5 2.5))
+3> POLYGON ((6.5 6.5, 6.5 7.5, 7.5 7.5, 7.5 6.5, 6.5 6.5))
+1> POLYGON ((4.5 4.5, 4.5 5.5, 5.5 5.5, 5.5 4.5, 4.5 4.5))
+4> POLYGON ((7.5 7.5, 7.5 8.5, 8.5 8.5, 8.5 7.5, 7.5 7.5))
+```
 
-## Convert between DataFrame and SpatialRDD
+### Store non-spatial attributes in Geometries
 
-### DataFrame to SpatialRDD
+You can concatenate other non-spatial attributes and store them in Geometry's `userData` field so you can recover them later on. `userData` field can be any object type.
 
-Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. Please read [Adapter Scaladoc](/api/javadoc/sql/org/apache/sedona/sql/utils/index.html)
+```Java
+import org.locationtech.jts.geom.Geometry;
 
-```Scala
-var spatialRDD = Adapter.toSpatialRdd(spatialDf, "usacounty")
+DataStream<Geometry> geometries = geomStream.map(new MapFunction<Row, Geometry>() {
+            @Override
+            public Geometry map(Row value) throws Exception {
+                Geometry geom = (Geometry) value.getField(0);
+                geom.setUserData(value.getField(1));
+                return geom;
+            }
+        });
+geometries.print();
 ```
 
-"usacounty" is the name of the geometry column
+The `print` command will not print out `userData` field. But you can get it this way:
 
-!!!warning
-	Only one Geometry type column is allowed per DataFrame.
+```Java
+import org.locationtech.jts.geom.Geometry;
+
+geometries.map(new MapFunction<Geometry, String>() {
+            @Override
+            public String map(Geometry value) throws Exception
+            {
+                return (String) value.getUserData();
+            }
+        }).print();
+```
+
+The output will be
+
+```
+13> polygon9
+6> polygon2
+10> polygon6
+11> polygon7
+5> polygon1
+12> polygon8
+8> polygon4
+4> polygon0
+7> polygon3
+9> polygon5
+```
 	
-### SpatialRDD to DataFrame
+## Convert Spatial DataStream to Spatial Table
+
+### Create Geometries using Sedona FormatUtils
 
-Use SedonaSQL DataFrame-RDD Adapter to convert a DataFrame to an SpatialRDD. Please read [Adapter Scaladoc](/api/javadoc/sql/org/apache/sedona/sql/utils/index.html)
+* Create a Geometry from a WKT string
 
-```Scala
-var spatialDf = Adapter.toDf(spatialRDD, sparkSession)
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
+
+DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
+            @Override
+            public Geometry map(String value) throws Exception
+            {
+                FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
+                return formatUtils.readGeometry(value);
+            }
+        })
 ```
 
-All other attributes such as price and age will be also brought to the DataFrame as long as you specify ==carryOtherAttributes== (see [Read other attributes in an SpatialRDD](../rdd#read-other-attributes-in-an-spatialrdd)).
+* Create a Point from a String `1.1, 2.2`. Use `,` as the delimiter.
 
-### SpatialPairRDD to DataFrame
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
 
-PairRDD is the result of a spatial join query or distance join query. SedonaSQL DataFrame-RDD Adapter can convert the result to a DataFrame. But you need to provide the name of other attributes.
+DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
+            @Override
+            public Geometry map(String value) throws Exception
+            {
+                FormatUtils<Geometry> formatUtils = new FormatUtils(",", false, GeometryType.POINT);
+                return formatUtils.readGeometry(value);
+            }
+        })
+```
 
-```Scala
-var joinResultDf = Adapter.toDf(joinResultPairRDD, Seq("left_attribute1", "left_attribute2"), Seq("right_attribute1", "right_attribute2"), sparkSession)
+* Create a Polygon from a String `1.1, 1.1, 10.1, 10.1`. This is a rectangle with (1.1, 1.1) and (10.1, 10.1) as their min/max corners.
+
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.Geometry;
+
+DataStream<Geometry> geometries = text.map(new MapFunction<String, Geometry>() {
+            @Override
+            public Geometry map(String value) throws Exception
+            {
+            	  // Write some code to get four double type values: minX, minY, maxX, maxY
+            	  ...
+	            Coordinate[] coordinates = new Coordinate[5];
+	            coordinates[0] = new Coordinate(minX, minY);
+	            coordinates[1] = new Coordinate(minX, maxY);
+	            coordinates[2] = new Coordinate(maxX, maxY);
+	            coordinates[3] = new Coordinate(maxX, minY);
+	            coordinates[4] = coordinates[0];
+	            GeometryFactory geometryFactory = new GeometryFactory();
+	            return geometryFactory.createPolygon(coordinates);
+            }
+        })
 ```
 
-or you can use the attribute names directly from the input RDD
+### Create Row objects
+
+Put a geometry in a Flink Row to a `geomStream`. Note that you can put other attributes in Row as well. This example uses a constant value `myName` for all geometries.
 
-```Scala
-import scala.collection.JavaConversions._
-var joinResultDf = Adapter.toDf(joinResultPairRDD, leftRdd.fieldNames, rightRdd.fieldNames, sparkSession)
+```Java
+import org.apache.sedona.core.formatMapper.FormatUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.apache.flink.types.Row;
+
+DataStream<Row> geomStream = text.map(new MapFunction<String, Row>() {
+            @Override
+            public Row map(String value) throws Exception
+            {
+                FormatUtils formatUtils = new FormatUtils(FileDataSplitter.WKT, false);
+                return Row.of(formatUtils.readGeometry(value), "myName");
+            }
+        })
 ```
 
-All other attributes such as price and age will be also brought to the DataFrame as long as you specify ==carryOtherAttributes== (see [Read other attributes in an SpatialRDD](../rdd#read-other-attributes-in-an-spatialrdd)).
+### Get Spatial Table
+
+Use TableEnv's fromDataStream function, with two column names `geom` and `geom_name`.
+```Java
+Table geomTable = tableEnv.fromDataStream(geomStream, "geom", "geom_name")
+```