You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/07/13 00:43:27 UTC

[incubator-sedona] branch master updated: [DOCS] Create python-vector-osm.md (#582)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8f26a9 [DOCS] Create python-vector-osm.md (#582)
6a8f26a9 is described below

commit 6a8f26a9bd19f98e281c45e9c2babd32947b2848
Author: Ana Caroline Ferreira <an...@gmail.com>
AuthorDate: Tue Jul 12 21:43:23 2022 -0300

    [DOCS] Create python-vector-osm.md (#582)
---
 docs/tutorial/python-vector-osm.md | 140 +++++++++++++++++++++++++++++++++++++
 1 file changed, 140 insertions(+)

diff --git a/docs/tutorial/python-vector-osm.md b/docs/tutorial/python-vector-osm.md
new file mode 100644
index 00000000..e06d3b24
--- /dev/null
+++ b/docs/tutorial/python-vector-osm.md
@@ -0,0 +1,140 @@
+# Example of spark + sedona + hdfs with slave nodes and OSM vector data consults
+
+```
+from IPython.display import display, HTML
+from pyspark.sql import SparkSession
+from pyspark import StorageLevel
+import pandas as pd
+from pyspark.sql.types import StructType, StructField,StringType, LongType, IntegerType, DoubleType, ArrayType
+from pyspark.sql.functions import regexp_replace
+from sedona.register import SedonaRegistrator
+from sedona.utils import SedonaKryoRegistrator, KryoSerializer
+from pyspark.sql.functions import col, split, expr
+from pyspark.sql.functions import udf, lit
+from sedona.utils import SedonaKryoRegistrator, KryoSerializer
+from pyspark.sql.functions import col, split, expr
+from pyspark.sql.functions import udf, lit, flatten
+from pywebhdfs.webhdfs import PyWebHdfsClient
+from datetime import date
+from pyspark.sql.functions import monotonically_increasing_id 
+import json
+```
+
+### Registering spark session, adding node executor configurations and sedona registrator
+
+```
+spark = SparkSession.\
+    builder.\
+    appName("Overpass-API").\
+    enableHiveSupport().\
+    master("local[*]").\
+    master("spark://spark-master:7077").\
+    config("spark.executor.memory", "15G").\
+    config("spark.driver.maxResultSize", "135G").\
+    config("spark.sql.shuffle.partitions", "500").\
+    config(' spark.sql.adaptive.coalescePartitions.enabled', True).\
+    config('spark.sql.adaptive.enabled', True).\
+    config('spark.sql.adaptive.coalescePartitions.initialPartitionNum', 125).\
+    config("spark.sql.execution.arrow.pyspark.enabled", True).\
+    config("spark.sql.execution.arrow.fallback.enabled", True).\
+    config('spark.kryoserializer.buffer.max', 2047).\
+    config("spark.serializer", KryoSerializer.getName).\
+    config("spark.kryo.registrator", SedonaKryoRegistrator.getName).\
+    config("spark.jars.packages", "org.apache.sedona:sedona-python-adapter-3.0_2.12:1.1.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2") .\
+    enableHiveSupport().\
+    getOrCreate()
+
+SedonaRegistrator.registerAll(spark)
+sc = spark.sparkContext
+```
+
+### Connecting to Overpass API to search and downloading data for saving into HDFS 
+
+```
+import requests
+import json
+
+overpass_url = "http://overpass-api.de/api/interpreter"
+overpass_query = """
+[out:json];
+area[name = "Foz do Iguaçu"];
+way(area)["highway"~""];
+out geom;
+>;
+out skel qt;
+"""
+
+response = requests.get(overpass_url, 
+                         params={'data': overpass_query})
+data = response.json()
+hdfs = PyWebHdfsClient(host='179.106.229.159',port='50070', user_name='root')
+file_name = "foz_roads_osm.json"
+hdfs.delete_file_dir(file_name)
+hdfs.create_file(file_name, json.dumps(data))
+
+```
+
+### Connecting spark sedona with saved hdfs file
+
+```
+path = "hdfs://776faf4d6a1e:8020/"+file_name
+df = spark.read.json(path, multiLine = "true")
+```
+
+### Consulting and organizing data for analisis
+
+```
+from pyspark.sql.functions import explode, arrays_zip
+
+df.createOrReplaceTempView("df")
+tb = spark.sql("select *, size(elements) total_nodes from df")
+tb.show(5)
+
+isolate_total_nodes = tb.select("total_nodes").toPandas()
+total_nodes = isolate_total_nodes["total_nodes"].iloc[0]
+print(total_nodes)
+
+isolate_ids = tb.select("elements.id").toPandas()
+ids = pd.DataFrame(isolate_ids["id"].iloc[0]).drop_duplicates()
+print(ids[0].iloc[1])
+
+formatted_df = tb\
+.withColumn("id", explode("elements.id"))
+
+formatted_df.show(5)
+
+formatted_df = tb\
+.withColumn("new", arrays_zip("elements.id", "elements.geometry", "elements.nodes", "elements.tags"))\
+.withColumn("new", explode("new"))
+
+formatted_df.show(5)
+
+# formatted_df.printSchema()
+
+formatted_df = formatted_df.select("new.0","new.1","new.2","new.3.maxspeed","new.3.incline","new.3.surface", "new.3.name", "total_nodes")
+formatted_df = formatted_df.withColumnRenamed("0","id").withColumnRenamed("1","geom").withColumnRenamed("2","nodes").withColumnRenamed("3","tags")
+formatted_df.createOrReplaceTempView("formatted_df")
+formatted_df.show(5)
+# TODO atualizar daqui para baixo para considerar a linha inteira na lógica
+points_tb = spark.sql("select geom, id from formatted_df where geom IS NOT NULL")
+points_tb = points_tb\
+.withColumn("new", arrays_zip("geom.lat", "geom.lon"))\
+.withColumn("new", explode("new"))
+
+points_tb = points_tb.select("new.0","new.1", "id")
+
+points_tb = points_tb.withColumnRenamed("0","lat").withColumnRenamed("1","lon")
+points_tb.printSchema()
+
+points_tb.createOrReplaceTempView("points_tb")
+
+points_tb.show(5)
+
+coordinates_tb = spark.sql("select (select collect_list(CONCAT(p1.lat,',',p1.lon)) from points_tb p1 where p1.id = p2.id group by p1.id) as coordinates, p2.id, p2.maxspeed, p2.incline, p2.surface, p2.name, p2.nodes, p2.total_nodes from formatted_df p2")
+coordinates_tb.createOrReplaceTempView("coordinates_tb")
+coordinates_tb.show(5)
+
+roads_tb = spark.sql("SELECT ST_LineStringFromText(REPLACE(REPLACE(CAST(coordinates as string),'[',''),']',''), ',') as geom, id, maxspeed, incline, surface, name, nodes, total_nodes FROM coordinates_tb WHERE coordinates IS NOT NULL")
+roads_tb.createOrReplaceTempView("roads_tb")
+roads_tb.show(5)
+```