You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2022/07/13 00:39:14 UTC
[incubator-sedona] branch master updated: [DOCS] Create NdviSentinelApacheSedona (#645)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 27b1e38f [DOCS] Create NdviSentinelApacheSedona (#645)
27b1e38f is described below
commit 27b1e38f43e80dd29634013c7237f3da732a1d12
Author: Ana Caroline Ferreira <an...@gmail.com>
AuthorDate: Tue Jul 12 21:39:09 2022 -0300
[DOCS] Create NdviSentinelApacheSedona (#645)
---
binder/NdviSentinelApacheSedona | 962 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 962 insertions(+)
diff --git a/binder/NdviSentinelApacheSedona b/binder/NdviSentinelApacheSedona
new file mode 100644
index 00000000..97eb8c18
--- /dev/null
+++ b/binder/NdviSentinelApacheSedona
@@ -0,0 +1,962 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "id": "778f8d92-8bf3-4e39-8b2e-42c644af4734",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# pip install sklearn \n",
+ "# pip install pyarrow\n",
+ "# pip install fsspec"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "id": "7a0ae142-47e9-4e04-8187-3a93baaa2cb5",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from IPython.display import display, HTML\n",
+ "from pyspark.sql import SparkSession\n",
+ "from pyspark import StorageLevel\n",
+ "import pandas as pd\n",
+ "from pyspark.sql.types import StructType, StructField,StringType, LongType, IntegerType, DoubleType, ArrayType\n",
+ "from pyspark.sql.functions import regexp_replace\n",
+ "from sedona.register import SedonaRegistrator\n",
+ "from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
+ "from pyspark.sql.functions import col, split, expr\n",
+ "from pyspark.sql.functions import udf, lit\n",
+ "from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
+ "from pyspark.sql.functions import col, split, expr\n",
+ "from pyspark.sql.functions import udf, lit, flatten\n",
+ "from pywebhdfs.webhdfs import PyWebHdfsClient\n",
+ "from datetime import date"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "id": "4edd3ce3-7e1f-409b-8a7b-acdeb0659835",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 3,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "analise_folder = \"analise_teste_\" + str(date.today())\n",
+ "hdfs = PyWebHdfsClient(host='179.106.229.159',port='50070', user_name='root')\n",
+ "hdfs.delete_file_dir(analise_folder, recursive=True)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "id": "6b3d9ae8",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "Warning: Ignoring non-Spark config property: spark.sql.adaptive.coalescePartitions.enabled\n",
+ "Ivy Default Cache set to: /root/.ivy2/cache\n",
+ "The jars for the packages stored in: /root/.ivy2/jars\n",
+ ":: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml\n",
+ "org.apache.sedona#sedona-python-adapter-3.0_2.12 added as a dependency\n",
+ "org.datasyslab#geotools-wrapper added as a dependency\n",
+ ":: resolving dependencies :: org.apache.spark#spark-submit-parent-475a8539-e626-41a7-8cca-cb3c72ad1694;1.0\n",
+ "\tconfs: [default]\n",
+ "\tfound org.apache.sedona#sedona-python-adapter-3.0_2.12;1.1.0-incubating in central\n",
+ "\tfound org.locationtech.jts#jts-core;1.18.0 in central\n",
+ "\tfound org.wololo#jts2geojson;0.16.1 in central\n",
+ "\tfound com.fasterxml.jackson.core#jackson-databind;2.12.2 in central\n",
+ "\tfound com.fasterxml.jackson.core#jackson-annotations;2.12.2 in central\n",
+ "\tfound com.fasterxml.jackson.core#jackson-core;2.12.2 in central\n",
+ "\tfound org.apache.sedona#sedona-core-3.0_2.12;1.1.0-incubating in central\n",
+ "\tfound org.apache.sedona#sedona-sql-3.0_2.12;1.1.0-incubating in central\n",
+ "\tfound org.datasyslab#geotools-wrapper;1.1.0-25.2 in central\n",
+ ":: resolution report :: resolve 503ms :: artifacts dl 8ms\n",
+ "\t:: modules in use:\n",
+ "\tcom.fasterxml.jackson.core#jackson-annotations;2.12.2 from central in [default]\n",
+ "\tcom.fasterxml.jackson.core#jackson-core;2.12.2 from central in [default]\n",
+ "\tcom.fasterxml.jackson.core#jackson-databind;2.12.2 from central in [default]\n",
+ "\torg.apache.sedona#sedona-core-3.0_2.12;1.1.0-incubating from central in [default]\n",
+ "\torg.apache.sedona#sedona-python-adapter-3.0_2.12;1.1.0-incubating from central in [default]\n",
+ "\torg.apache.sedona#sedona-sql-3.0_2.12;1.1.0-incubating from central in [default]\n",
+ "\torg.datasyslab#geotools-wrapper;1.1.0-25.2 from central in [default]\n",
+ "\torg.locationtech.jts#jts-core;1.18.0 from central in [default]\n",
+ "\torg.wololo#jts2geojson;0.16.1 from central in [default]\n",
+ "\t:: evicted modules:\n",
+ "\torg.locationtech.jts#jts-core;1.18.1 by [org.locationtech.jts#jts-core;1.18.0] in [default]\n",
+ "\t---------------------------------------------------------------------\n",
+ "\t| | modules || artifacts |\n",
+ "\t| conf | number| search|dwnlded|evicted|| number|dwnlded|\n",
+ "\t---------------------------------------------------------------------\n",
+ "\t| default | 10 | 0 | 0 | 1 || 9 | 0 |\n",
+ "\t---------------------------------------------------------------------\n",
+ ":: retrieving :: org.apache.spark#spark-submit-parent-475a8539-e626-41a7-8cca-cb3c72ad1694\n",
+ "\tconfs: [default]\n",
+ "\t0 artifacts copied, 9 already retrieved (0kB/5ms)\n",
+ "22/01/06 20:04:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n",
+ "Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\n",
+ "Setting default log level to \"WARN\".\n",
+ "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
+ "22/01/06 20:04:45 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "22/01/06 20:04:45 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ " \r"
+ ]
+ }
+ ],
+ "source": [
+ "# spark.scheduler.mode', 'FAIR'\n",
+ "spark = SparkSession.\\\n",
+ " builder.\\\n",
+ " appName(\"Sentinel-app\").\\\n",
+ " enableHiveSupport().\\\n",
+ " master(\"local[*]\").\\\n",
+ " master(\"spark://spark-master:7077\").\\\n",
+ " config(\"spark.executor.memory\", \"15G\").\\\n",
+ " config(\"spark.driver.maxResultSize\", \"135G\").\\\n",
+ " config(\"spark.sql.shuffle.partitions\", \"500\").\\\n",
+ " config(' spark.sql.adaptive.coalescePartitions.enabled', True).\\\n",
+ " config('spark.sql.adaptive.enabled', True).\\\n",
+ " config('spark.sql.adaptive.coalescePartitions.initialPartitionNum', 125).\\\n",
+ " config(\"spark.sql.execution.arrow.pyspark.enabled\", True).\\\n",
+ " config(\"spark.sql.execution.arrow.fallback.enabled\", True).\\\n",
+ " config('spark.kryoserializer.buffer.max', 2047).\\\n",
+ " config(\"spark.serializer\", KryoSerializer.getName).\\\n",
+ " config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName).\\\n",
+ " config(\"spark.jars.packages\", \"org.apache.sedona:sedona-python-adapter-3.0_2.12:1.1.0-incubating,org.datasyslab:geotools-wrapper:1.1.0-25.2\") .\\\n",
+ " enableHiveSupport().\\\n",
+ " getOrCreate()\n",
+ "\n",
+ "SedonaRegistrator.registerAll(spark)\n",
+ "sc = spark.sparkContext"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "id": "68018b95",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ " \r"
+ ]
+ }
+ ],
+ "source": [
+ "# Path to directory of geotiff images \n",
+ "DATA_DIR = \"hdfs://776faf4d6a1e:8020/sentinel2_tmp/*\"\n",
+ "df = spark.read.format(\"geotiff\").option(\"dropInvalid\",True).load(DATA_DIR)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "id": "6c538082-3500-4a2c-ba93-16eb6f141dad",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "From local[5]4\n"
+ ]
+ }
+ ],
+ "source": [
+ "# SUPER IMPORTANT ULTRA MEGA POWER FOR MEMORY PROBLENS SOLVE\n",
+ "rdd = spark.sparkContext.parallelize((0,20))\n",
+ "print(\"From local[5]\"+str(rdd.getNumPartitions()))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "id": "e2308653-b592-41e4-b495-6eb1efbdd0c1",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- image: struct (nullable = true)\n",
+ " | |-- origin: string (nullable = true)\n",
+ " | |-- wkt: string (nullable = true)\n",
+ " | |-- height: integer (nullable = true)\n",
+ " | |-- width: integer (nullable = true)\n",
+ " | |-- nBands: integer (nullable = true)\n",
+ " | |-- data: array (nullable = true)\n",
+ " | | |-- element: double (containsNull = true)\n",
+ "\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "22/01/06 20:04:57 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
+ ]
+ }
+ ],
+ "source": [
+ "df.cache()\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "id": "de05ddb3-83e8-40ed-b3b4-cbbf90ae08a8",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "== Physical Plan ==\n",
+ "*(1) Project [image#14, monotonically_increasing_id() AS id#22L]\n",
+ "+- InMemoryTableScan [image#14]\n",
+ " +- InMemoryRelation [image#14], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
+ " +- FileScan geotiff [image#14] Batched: false, DataFilters: [], Format: org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat@5f9f8a31, Location: InMemoryFileIndex[hdfs://776faf4d6a1e:8020/sentinel2_tmp/1, hdfs://776faf4d6a1e:8020/sentinel2_tm..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<image:struct<origin:string,wkt:string,height:int,width:int,nBands:int,data:array<double>>>\n",
+ "\n",
+ "\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[Stage 4:> (0 + 1) / 1]\r"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+---+\n",
+ "| image| id|\n",
+ "+--------------------+---+\n",
+ "|[hdfs://776faf4d6...| 0|\n",
+ "|[hdfs://776faf4d6...| 1|\n",
+ "|[hdfs://776faf4d6...| 2|\n",
+ "|[hdfs://776faf4d6...| 3|\n",
+ "|[hdfs://776faf4d6...| 4|\n",
+ "+--------------------+---+\n",
+ "only showing top 5 rows\n",
+ "\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ " \r"
+ ]
+ }
+ ],
+ "source": [
+ "from pyspark.sql.functions import monotonically_increasing_id \n",
+ "# add ID\n",
+ "df_index = df.select(\"*\").withColumn(\"id\", monotonically_increasing_id())\n",
+ "df_index.explain()\n",
+ "df_index.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "id": "d0017088-c448-48af-97fd-4607db94c0de",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "[('id', 'bigint'), ('origin', 'string'), ('height', 'int'), ('width', 'int'), ('data', 'string'), ('bands', 'int')]\n",
+ "== Physical Plan ==\n",
+ "*(1) Project [id#22L, image#14.origin AS origin#65, image#14.height AS height#66, image#14.width AS width#67, cast(image#14.data as string) AS data#68, image#14.nBands AS bands#69]\n",
+ "+- *(1) Project [image#14, monotonically_increasing_id() AS id#22L]\n",
+ " +- InMemoryTableScan [image#14]\n",
+ " +- InMemoryRelation [image#14], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
+ " +- FileScan geotiff [image#14] Batched: false, DataFilters: [], Format: org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat@5f9f8a31, Location: InMemoryFileIndex[hdfs://776faf4d6a1e:8020/sentinel2_tmp/1, hdfs://776faf4d6a1e:8020/sentinel2_tm..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<image:struct<origin:string,wkt:string,height:int,width:int,nBands:int,data:array<double>>>\n",
+ "\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "# \"image.wkt as Geom\", \n",
+ "df_export = df_index.selectExpr(\"id\",\"image.origin as origin\",\n",
+ " \"image.height as height\", \"image.width as width\", \n",
+ " \"cast(image.data as string) as data\", \"image.nBands as bands\")\n",
+ "print(df_export.dtypes)\n",
+ "df_export.explain()\n",
+ "df_export.createOrReplaceTempView(\"df_export\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "id": "25ad0246-fc8f-42ca-866e-209a471c180f",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# df_export.repartition(\"origin\").write.format('csv').option('header', True).partitionBy(\"origin\").mode('overwrite').option('sep', ',').save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)\n",
+ "# df_export.write.format('csv').option('header', True).option('sep', ',').save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)\n",
+ "# start = 0\n",
+ "# end = 10\n",
+ "# part_df_export = spark.sql('select * from df_export where id between '+str(start)+' and '+str(end))\n",
+ "# part_df_export.show(7)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "id": "0026d557-5926-4759-8c98-5424a0e54b81",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# df_writer = part_df_export.write.format('csv').option('header', True).option('sep', ',')\n",
+ "# df_writer.save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "id": "a0516441-a907-4b68-bf23-a2ad95490eb7",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# POR 1 LINHA SER GRANDE O SUFICIENTE PARA ESTOURO DE MEMORIA O COLLECT NÂO FUNCIONA E NEM SALVAR O DF_SPARK DIRETO\n",
+ "# (NECESSÀRIO TRANFORMAR PARA PANDAS LINHA A LINHA)\n",
+ "# part_df_export.take(3)\n",
+ "part_df_export = df_export.take(1)\n",
+ "# print(part_df_export)\n",
+ "pd.DataFrame(part_df_export).to_csv(\"teste.csv\", sep=',', encoding='utf-8')"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "id": "5b870bc9",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "22/01/06 20:05:02 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+ "| origin| Geom|height|width| data|bands|\n",
+ "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.546...| 186| 300|[409.0, 404.0, 41...| 4|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.546...| 186| 300|[1838.0, 1778.0, ...| 4|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.274...| 199| 257|[931.0, 971.0, 95...| 4|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.274...| 199| 257|[957.0, 995.0, 97...| 4|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.274...| 199| 257|[428.0, 428.0, 43...| 4|\n",
+ "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+ "only showing top 5 rows\n",
+ "\n",
+ "[('origin', 'string'), ('Geom', 'udt'), ('height', 'int'), ('width', 'int'), ('data', 'array<double>'), ('bands', 'int')]\n",
+ "== Physical Plan ==\n",
+ "InMemoryTableScan [origin#111, Geom#112, height#113, width#114, data#115, bands#116]\n",
+ " +- InMemoryRelation [origin#111, Geom#112, height#113, width#114, data#115, bands#116], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
+ " +- Project [image#14.origin AS origin#111, st_geomfromwkt(image#14.wkt) AS Geom#112, image#14.height AS height#113, image#14.width AS width#114, image#14.data AS data#115, image#14.nBands AS bands#116]\n",
+ " +- InMemoryTableScan [image#14]\n",
+ " +- InMemoryRelation [image#14], StorageLevel(disk, memory, deserialized, 1 replicas)\n",
+ " +- FileScan geotiff [image#14] Batched: false, DataFilters: [], Format: org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat@5f9f8a31, Location: InMemoryFileIndex[hdfs://776faf4d6a1e:8020/sentinel2_tmp/1, hdfs://776faf4d6a1e:8020/sentinel2_tm..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<image:struct<origin:string,wkt:string,height:int,width:int,nBands:int,data:array<double>>>\n",
+ "\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = df.selectExpr(\"image.origin as origin\",\"ST_GeomFromWkt(image.wkt) as Geom\", \n",
+ " \"image.height as height\", \"image.width as width\", \"image.data as data\", \n",
+ " \"image.nBands as bands\").cache()\n",
+ "df.show(5)\n",
+ "print(df.dtypes)\n",
+ "df.explain()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "id": "bc0ff12f-f9bf-48bd-b0c4-874446cd4dd5",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "22/01/06 20:05:03 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "[Stage 7:> (0 + 1) / 1]\r"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "| origin| Geom| B2| B3| B4| B8| constant_evi_2| constant_evi_1| constant_evi_3| constant_tgi_1| constant_tgi_2| corrector|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.546...|[409.0, 404.0, 41...|[713.0, 673.0, 70...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.546...|[1838.0, 1778.0, ...|[1074.0, 1026.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.274...|[931.0, 971.0, 95...|[1282.0, 1356.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.274...|[957.0, 995.0, 97...|[1282.0, 1354.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|hdfs://776faf4d6a...|POLYGON ((-54.274...|[428.0, 428.0, 43...|[880.0, 874.0, 79...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "only showing top 5 rows\n",
+ "\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ " \r"
+ ]
+ }
+ ],
+ "source": [
+ "df = df.selectExpr(\"origin\", \"Geom\",\"RS_GetBand(data, 1,bands) as B2\",\"RS_GetBand(data, 2,bands) as B3\",\n",
+ " \"RS_GetBand(data, 3,bands) as B4\", \n",
+ " \"RS_GetBand(data, 4,bands) as B8\", \n",
+ " \"RS_Array(height * width, 2.4) as constant_evi_2\",\n",
+ " \"RS_Array(height * width, 2.5) as constant_evi_1\",\n",
+ " \"RS_Array(height * width, 1.0) as constant_evi_3\",\n",
+ " \"RS_Array(height * width, -0.5) as constant_tgi_1\",\n",
+ " \"RS_Array(height * width, 120.0) as constant_tgi_2\",\n",
+ " \"RS_Array(height * width, 0.001) as corrector\").cache()\n",
+ "df.createOrReplaceTempView(\"allbands\")\n",
+ "df.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "id": "ebfbf8b5-dfac-4a45-a8d8-a3529c6426b7",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "[Stage 9:> (0 + 1) / 1]\r"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "| image_date|feature_name| origin| Geom| B2| B3| B4| B8| constant_evi_2| constant_evi_1| constant_evi_3| constant_tgi_1| constant_tgi_2| corrector|\n",
+ "+-------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "|2021-12-26 13:42:12| 70|hdfs://776faf4d6a...|POLYGON ((-54.546...|[409.0, 404.0, 41...|[713.0, 673.0, 70...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|2021-12-21 13:42:05| 70|hdfs://776faf4d6a...|POLYGON ((-54.546...|[1838.0, 1778.0, ...|[1074.0, 1026.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|2021-12-26 13:42:12| 3|hdfs://776faf4d6a...|POLYGON ((-54.274...|[931.0, 971.0, 95...|[1282.0, 1356.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|2021-12-26 13:42:12| 3|hdfs://776faf4d6a...|POLYGON ((-54.274...|[957.0, 995.0, 97...|[1282.0, 1354.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "|2021-12-16 13:42:11| 3|hdfs://776faf4d6a...|POLYGON ((-54.274...|[428.0, 428.0, 43...|[880.0, 874.0, 79...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[2.4, 2.4, 2.4, 2...|[2.5, 2.5, 2.5, 2...|[1.0, 1.0, 1.0, 1...|[-0.5, -0.5, -0.5...|[120.0, 120.0, 12...|[0.001, 0.001, 0....|\n",
+ "+-------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "only showing top 5 rows\n",
+ "\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ " \r"
+ ]
+ }
+ ],
+ "source": [
+ "# Não tem data da imagem \n",
+ "# Não tem parte a qual ela se refere\n",
+ "# Necessário adicionar\n",
+ "origin = df.selectExpr(\"origin\")\n",
+ "split_origin = origin.select(split(col(\"origin\"),\"/\"))\n",
+ "split_origin.head()\n",
+ "# 20211226T134212\n",
+ "split_origin = spark.sql(\"select to_timestamp(REPLACE(SPLIT(SPLIT(origin,'/')[5], '_')[1],'T',' '),'yyyyMMdd HHmmss') as image_date, SPLIT(origin,'/')[4] as feature_name, * from allbands\")\n",
+ "split_origin.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "id": "c5617a09-a3ba-4769-bbd9-9420ad7d9fb8",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "22/01/06 20:05:06 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.\n",
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
+ ]
+ }
+ ],
+ "source": [
+ "# Fator de correcao da banda para ficar com valores entre 0 e 1\n",
+ "correct_origin = split_origin.selectExpr(\"RS_MultiplyBands(B2, corrector) as bluen\",\n",
+ " \"RS_MultiplyBands(B3, corrector) as greenn\", \n",
+ " \"RS_MultiplyBands(B4, corrector) as redn\",\n",
+ " \"RS_MultiplyBands(B8, corrector) as nirn\", \n",
+ " \"*\").cache()\n",
+ "correct_origin = correct_origin.selectExpr(\"RS_NormalizedDifference(nirn, redn) as gndvi\",\n",
+ " \"RS_SubtractBands(nirn, redn) as sub_nirn_redn\", \n",
+ " \"RS_AddBands(nirn,constant_evi_2) as add_nirn_contant_evi_2\",\n",
+ " \"RS_AddBands(redn, constant_evi_3) as add_redn_contant_evi_3\", \n",
+ " \"RS_DivideBands(nirn, greenn) as div_nirn_greenn\",\n",
+ " \"RS_SubtractBands(greenn, redn) as sub_greenn_redn\",\n",
+ " \"RS_SubtractBands(redn, greenn) as sub_redn_greenn\",\n",
+ " \"RS_SubtractBands(redn, bluen) as sub_redn_bluen\",\n",
+ " \"RS_AddBands(greenn, redn) as add_greenn_redn\",\n",
+ " \"*\").cache()\n",
+ "\n",
+ "correct_origin = correct_origin.selectExpr(\"RS_SubtractBands(add_greenn_redn, bluen) as greenn_redn_sub_bluen\",\n",
+ " \"RS_AddBands(add_greenn_redn, bluen) as greenn_redn_add_bluen\",\n",
+ " \"RS_SubtractBands(sub_greenn_redn, bluen) as sub_greenn_redn_bluen\",\n",
+ " \"RS_SubtractBands(sub_redn_greenn, constant_tgi_2) as sub_red_gren_tgi_2\",\n",
+ " \"*\").cache()\n",
+ "correct_origin = correct_origin.selectExpr(\"RS_MultiplyFactor(sub_redn_bluen,120) as ms_redn_bluen_120\",\n",
+ " \"*\").cache()\n",
+ "correct_origin = correct_origin.selectExpr(\"RS_MultiplyFactor(sub_redn_greenn,190) as ms_redn_greenn_190\",\n",
+ " \"*\").cache()\n",
+ "correct_origin = correct_origin.selectExpr(\"RS_SubtractBands(ms_redn_greenn_190,ms_redn_bluen_120) as sub_msrg_190_msrb_120\",\n",
+ " \"*\").cache()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "id": "1efbe09b-e174-4e95-adca-e788618b4ef0",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "22/01/06 20:05:06 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n",
+ "[Stage 10:> (0 + 1) / 1]\r"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+\n",
+ "| gndvi| evi| gci| vari| gli| tgi| origin| image_date|feature_name|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+\n",
+ "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[2.35, 2.5, 2.45,...|[1.0, 1.0, 1.0, 1...|[43.1949999999999...|hdfs://776faf4d6a...|2021-12-26 13:42:12| 70|\n",
+ "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[-1.41, -1.36, -1...|[1.0, 1.0, 1.0, 1...|[-8.25, -9.210000...|hdfs://776faf4d6a...|2021-12-21 13:42:05| 70|\n",
+ "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[3.65, 3.52, 3.9,...|[1.0, 1.0, 1.0, 1...|[65.93, 70.560000...|hdfs://776faf4d6a...|2021-12-26 13:42:12| 3|\n",
+ "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[3.94, 3.77, 4.17...|[1.0, 1.0, 1.0, 1...|[64.37, 68.929999...|hdfs://776faf4d6a...|2021-12-26 13:42:12| 3|\n",
+ "|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|[1.0, 1.0, 1.0, 1...|[1.95, 1.96, 2.19...|[1.0, 1.0, 1.0, 1...|[57.9199999999999...|hdfs://776faf4d6a...|2021-12-16 13:42:11| 3|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+------------+\n",
+ "only showing top 5 rows\n",
+ "\n",
+ "root\n",
+ " |-- gndvi: array (nullable = false)\n",
+ " | |-- element: double (containsNull = true)\n",
+ " |-- evi: array (nullable = false)\n",
+ " | |-- element: double (containsNull = true)\n",
+ " |-- gci: array (nullable = false)\n",
+ " | |-- element: double (containsNull = true)\n",
+ " |-- vari: array (nullable = false)\n",
+ " | |-- element: double (containsNull = true)\n",
+ " |-- gli: array (nullable = false)\n",
+ " | |-- element: double (containsNull = true)\n",
+ " |-- tgi: array (nullable = false)\n",
+ " | |-- element: double (containsNull = true)\n",
+ " |-- origin: string (nullable = true)\n",
+ " |-- image_date: timestamp (nullable = true)\n",
+ " |-- feature_name: string (nullable = true)\n",
+ "\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ " \r"
+ ]
+ }
+ ],
+ "source": [
+ " # bluen = src.read(1, masked=True) / 10000\n",
+ " # greenn = src.read(2, masked=True) / 10000\n",
+ " # redn = src.read(3, masked=True) / 10000\n",
+ " # nirn = src.read(4, masked=True) / 10000\n",
+ " # evi = 2.5 * (nirn - redn) / (nirn + 2.4 * redn + 1)\n",
+ " # gci = (nirn / greenn) - 1\n",
+ " # gli = (2 * greenn - redn - bluen) / (2 * greenn + redn + bluen)\n",
+ " # gndvi = (nirn - greenn) / (nirn + greenn)\n",
+ " # tgi = (-0.5) * (190 * (redn - greenn) - 120 * (redn - bluen))\n",
+ " # vari = (greenn - redn) / (greenn + redn - bluen)\n",
+ "\n",
+ "\n",
+ "calculated = correct_origin.selectExpr(\"RS_NormalizedDifference(nirn, redn) as gndvi\",\n",
+ " \"RS_DivideBands(RS_MultiplyBands(constant_evi_1, sub_nirn_redn), RS_MultiplyBands(add_nirn_contant_evi_2, add_redn_contant_evi_3)) as evi\",\n",
+ " \"RS_SubtractBands(div_nirn_greenn, constant_evi_3) as gci\",\n",
+ " \"RS_DivideBands(sub_greenn_redn, greenn_redn_sub_bluen) as vari\",\n",
+ " \"RS_DivideBands(RS_MultiplyFactor(sub_greenn_redn_bluen,2),RS_MultiplyFactor(greenn_redn_add_bluen, 2)) as gli\",\n",
+ " \"RS_MultiplyBands(constant_tgi_1,sub_msrg_190_msrb_120) as tgi\", \"origin\", \"image_date\", \"feature_name\").cache()\n",
+ "calculated.show(5)\n",
+ "calculated.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "82026c96-13ec-4555-b6a8-cf5b3072fb50",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "id": "aa2da947-3cb8-4050-aec7-860d24c4cdad",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "22/01/06 20:05:11 WARN SQLConf: The SQL config 'spark.sql.execution.arrow.fallback.enabled' has been deprecated in Spark v3.0 and may be removed in the future. Use 'spark.sql.execution.arrow.pyspark.fallback.enabled' instead of it.\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+---+---+----+---+------+--------------------+-------------------+------------+\n",
+ "|gndvi|evi|gci|vari|gli| tgi| origin| image_date|feature_name|\n",
+ "+-----+---+---+----+---+------+--------------------+-------------------+------------+\n",
+ "| 0.0|0.0|1.0| 0.0|1.0| -64.0|hdfs://776faf4d6a...|2021-12-26 13:42:12| 70|\n",
+ "| 0.0|0.0|1.0| 0.0|1.0|-94.53|hdfs://776faf4d6a...|2021-12-21 13:42:05| 70|\n",
+ "| 0.0|0.0|1.0| 0.0|1.0|-49.08|hdfs://776faf4d6a...|2021-12-26 13:42:12| 3|\n",
+ "| 0.0|0.0|1.0| 0.0|1.0|-50.85|hdfs://776faf4d6a...|2021-12-26 13:42:12| 3|\n",
+ "| 0.0|0.0|1.0| 0.0|1.0| -23.8|hdfs://776faf4d6a...|2021-12-16 13:42:11| 3|\n",
+ "+-----+---+---+----+---+------+--------------------+-------------------+------------+\n",
+ "only showing top 5 rows\n",
+ "\n",
+ "root\n",
+ " |-- gndvi: double (nullable = false)\n",
+ " |-- evi: double (nullable = false)\n",
+ " |-- gci: double (nullable = false)\n",
+ " |-- vari: double (nullable = false)\n",
+ " |-- gli: double (nullable = false)\n",
+ " |-- tgi: double (nullable = false)\n",
+ " |-- origin: string (nullable = true)\n",
+ " |-- image_date: timestamp (nullable = true)\n",
+ " |-- feature_name: string (nullable = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "calculated_mean = calculated.selectExpr(\"RS_Mean(gndvi) as gndvi\",\n",
+ " \"RS_Mean(evi) as evi\",\n",
+ " \"RS_Mean(gci) as gci\",\n",
+ " \"RS_Mean(vari) as vari\",\n",
+ " \"RS_Mean(gli) as gli\",\n",
+ " \"RS_Mean(tgi) as tgi\", \"origin\", \"image_date\", \"feature_name\").cache()\n",
+ "calculated_mean.show(5)\n",
+ "calculated_mean.printSchema()\n",
+ "calculated_mean.createOrReplaceTempView(\"all_mean\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 35,
+ "id": "95d0a7d6-751e-4e01-bae7-52ddd1efc40c",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "[Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-64.0, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/70/20211226T134211_20211226T134212_T21JYM.tif', image_date=datetime.datetime(2021, 12, 26, 13, 42, 12), feature_name='70'), Row(gndvi=0.0, evi=0.0, gci=1.0, vari=0.0, gli=1.0, tgi=-94.53, origin='hdfs://776faf4d6a1e:8020/sentinel2_tmp/70/20211221T134209_20211221T134205_T21JYM.tif', image_date=datetime.datetime(2021, 12, 21, 13, 42, 5), feature_name='70'), Row(gndvi=0.0, e [...]
+ ]
+ }
+ ],
+ "source": [
+ "# POR 1 LINHA SER GRANDE O SUFICIENTE PARA ESTOURO DE MEMORIA O COLLECT NÂO FUNCIONA E NEM SALVAR O DF_SPARK DIRETO\n",
+ "# (NECESSÀRIO TRANFORMAR PARA PANDAS LINHA A LINHA)\n",
+ "# part_df_export.take(3)\n",
+ "part_df_export = calculated_mean.limit(10).collect()\n",
+ "print(part_df_export)\n",
+ "pd.DataFrame(part_df_export).to_csv(\"teste.csv\", sep=',', encoding='utf-8')"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "id": "35bd0426-05c7-4ca5-ba47-9066ff5917d4",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "Garbage collector: collected 199 objects.\n"
+ ]
+ }
+ ],
+ "source": [
+ "# SAVE COPY TO HDFS\n",
+ "# dá o mesmo problema de threadshod unsuficiente que ocorre no fit\n",
+ "import gc\n",
+ "collected = gc.collect()\n",
+ "print(\"Garbage collector: collected %d objects.\" % collected)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "id": "0b71708b-2731-43e8-a0fb-fe1c61e5e27f",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# calculated_mean.repartition(\"origin\").write.format('csv').option('header', True).partitionBy(\"origin\").mode('overwrite').option('sep', ',').save(\"hdfs://776faf4d6a1e:8020/\"+analise_folder)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "id": "2a1e1b63-fad0-4ca0-9a87-e14ac6d19d49",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Random Forest\n",
+ "from pyspark.ml import Pipeline\n",
+ "from pyspark.ml.classification import RandomForestClassifier\n",
+ "from pyspark.ml.linalg import Vectors\n",
+ "from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler\n",
+ "from pyspark.ml.evaluation import MulticlassClassificationEvaluator\n",
+ "from pyspark.ml.tuning import ParamGridBuilder\n",
+ "import numpy as np\n",
+ "from pyspark.ml.tuning import CrossValidator\n",
+ "from pyspark.ml.evaluation import RegressionEvaluator\n",
+ "from pyspark.ml.feature import OneHotEncoder"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "id": "03c86d20-0110-4f7c-826f-4a6c2fedec22",
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- vari: double (nullable = false)\n",
+ "\n",
+ "+----+\n",
+ "|vari|\n",
+ "+----+\n",
+ "| 0.0|\n",
+ "| 0.0|\n",
+ "| 0.0|\n",
+ "| 0.0|\n",
+ "| 0.0|\n",
+ "+----+\n",
+ "only showing top 5 rows\n",
+ "\n",
+ "+----+-----+---+------+---+------+--------------------+\n",
+ "|vari|gndvi|evi| tgi|gli|labels| features|\n",
+ "+----+-----+---+------+---+------+--------------------+\n",
+ "| 0.0| 0.0|0.0| -64.0|1.0| 70|(5,[3,4],[-64.0,1...|\n",
+ "| 0.0| 0.0|0.0|-94.53|1.0| 70|(5,[3,4],[-94.53,...|\n",
+ "| 0.0| 0.0|0.0|-49.08|1.0| 3|(5,[3,4],[-49.08,...|\n",
+ "| 0.0| 0.0|0.0|-50.85|1.0| 3|(5,[3,4],[-50.85,...|\n",
+ "| 0.0| 0.0|0.0| -23.8|1.0| 3|(5,[3,4],[-23.8,1...|\n",
+ "+----+-----+---+------+---+------+--------------------+\n",
+ "only showing top 5 rows\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "vari = calculated_mean.select('vari')\n",
+ "vari.printSchema()\n",
+ "vari.show(5)\n",
+ "df_rf_assembler = calculated_mean.selectExpr(\"vari\",\"gndvi\",\"evi\",\"tgi\",\"gli\",\"cast(feature_name as long) as labels\")\n",
+ "# FORMATO NECESSARIO PARA O FIT\n",
+ "feature_list = [col for col in df_rf_assembler.columns if col != 'labels']\n",
+ "assembler = VectorAssembler(inputCols=feature_list, outputCol=\"features\")\n",
+ "# rf = RandomForestClassifier(labelCol=\"labels\", featuresCol=\"features\")\n",
+ "df_rf_assembler = assembler.transform(df_rf_assembler)\n",
+ "df_rf_assembler.show(5)\n",
+ "# (trainingData, testData) = df_rf_assembler.randomSplit([0.8, 0.2])\n",
+ "# trainingData.show(5)\n",
+ "# testData.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "id": "2f3eea9a-b924-4c36-9c5b-d98f639b1b62",
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "True"
+ ]
+ },
+ "execution_count": 24,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "hdfs.delete_file_dir(\"teste\", recursive=True)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "id": "977932bc-ad5a-43c9-af05-4c9a8b7ac17e",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import numpy\n",
+ "from numpy import allclose\n",
+ "from pyspark.ml.linalg import Vectors\n",
+ "from pyspark.ml.feature import StringIndexer\n",
+ "from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel\n",
+ "# df = spark.createDataFrame([\n",
+ "# (1.0, Vectors.dense(1.0)),\n",
+ "# (0.0, Vectors.sparse(1, [], []))], [\"label\", \"features\"])\n",
+ "\n",
+ "# stringIndexer = StringIndexer(inputCol=\"labels\", outputCol=\"indexed\")\n",
+ "# si_model = stringIndexer.fit(df)\n",
+ "# td = si_model.transform(df)\n",
+ "# rf = RandomForestClassifier(numTrees=3, maxDepth=2, labelCol=\"indexed\", seed=42,\n",
+ "# leafCol=\"leafId\")\n",
+ "# rf.getMinWeightFractionPerNode()\n",
+ "\n",
+ "# model = rf.fit(td)\n",
+ "# model.getLabelCol()\n",
+ "\n",
+ "# model.setFeaturesCol(\"features\")\n",
+ "\n",
+ "# model.setRawPredictionCol(\"newRawPrediction\")\n",
+ "\n",
+ "# model.getBootstrap()\n",
+ "\n",
+ "# model.getRawPredictionCol()\n",
+ "\n",
+ "# model.featureImportances\n",
+ "\n",
+ "# allclose(model.treeWeights, [1.0, 1.0, 1.0])\n",
+ "\n",
+ "# test0 = spark.createDataFrame([(Vectors.dense(-1.0),)], [\"features\"])\n",
+ "# model.predict(test0.head().features)\n",
+ "\n",
+ "# model.predictRaw(test0.head().features)\n",
+ "\n",
+ "# model.predictProbability(test0.head().features)\n",
+ "\n",
+ "# result = model.transform(test0).head()\n",
+ "# result.prediction\n",
+ "\n",
+ "# numpy.argmax(result.probability)\n",
+ "\n",
+ "# numpy.argmax(result.newRawPrediction)\n",
+ "\n",
+ "# result.leafId\n",
+ "\n",
+ "# test1 = spark.createDataFrame([(Vectors.sparse(1, [0], [1.0]),)], [\"features\"])\n",
+ "# model.transform(test1).head().prediction\n",
+ "\n",
+ "# model.trees\n",
+ "# temp_path= 'hdfs://776faf4d6a1e:8020/teste'\n",
+ "# rfc_path = temp_path + \"/rfc\"\n",
+ "# rf.save(rfc_path)\n",
+ "# rf2 = RandomForestClassifier.load(rfc_path)\n",
+ "# rf2.getNumTrees()\n",
+ "\n",
+ "# model_path = temp_path + \"/rfc_model\"\n",
+ "# model.save(model_path)\n",
+ "# model2 = RandomForestClassificationModel.load(model_path)\n",
+ "# model.featureImportances == model2.featureImportances\n",
+ "\n",
+ "# model.transform(test0).take(1) == model2.transform(test0).take(1)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "id": "9679e0f3-8ad5-47b4-9dc5-daddd55e2ab4",
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "id": "e20dac4d-1a4a-4cf8-a674-a32af03184d6",
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# spark.stop()"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.9.2"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 5
+}