You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2021/06/17 09:36:28 UTC
[incubator-sedona] branch master updated: [SEDONA-30] Add raster
data support in Sedona SQL (#523)
This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git
The following commit(s) were added to refs/heads/master by this push:
new 8fd688f [SEDONA-30] Add raster data support in Sedona SQL (#523)
8fd688f is described below
commit 8fd688f4c26374bf2f4811d1dc4c333d9acd3b4d
Author: shantanuaggarwal2695 <sh...@gmail.com>
AuthorDate: Thu Jun 17 02:36:18 2021 -0700
[SEDONA-30] Add raster data support in Sedona SQL (#523)
Co-authored-by: Jia Yu <ji...@gmail.com>
Co-authored-by: Jia Yu <ji...@apache.org>
---
.gitignore | 2 +-
binder/ApacheSedonaRaster_1.ipynb | 871 +++++++++++++++++++++
binder/data/raster/T21HUB_4704_4736_8224_8256.tif | Bin 0 -> 6619 bytes
.../data/raster/vya_T21HUB_992_1024_4352_4384.tif | Bin 0 -> 7689 bytes
core/src/test/resources/raster/test1.tiff | Bin 0 -> 174803 bytes
core/src/test/resources/raster/test2.tiff | Bin 0 -> 174803 bytes
docs/api/sql/Constructor.md | 2 +-
docs/api/sql/Raster-loader.md | 144 ++++
docs/api/sql/Raster-operators.md | 309 ++++++++
docs/tutorial/raster.md | 19 +
mkdocs.yml | 15 +-
pom.xml | 13 +
python-adapter/.gitignore | 3 +-
python-adapter/pom.xml | 13 +
spark-version-converter.py | 3 +-
...org.apache.spark.sql.sources.DataSourceRegister | 1 +
.../scala/org/apache/sedona/sql/UDF/Catalog.scala | 31 +-
.../sql/sedona_sql/expressions/Functions.scala | 3 +-
.../sedona_sql/expressions/raster/Functions.scala | 865 ++++++++++++++++++++
.../sql/sedona_sql/expressions/raster/IO.scala | 256 ++++++
.../sql/sedona_sql/io/GeotiffFileFormat.scala | 104 +++
.../spark/sql/sedona_sql/io/GeotiffSchema.scala | 195 +++++
.../spark/sql/sedona_sql/io/HadoopUtils.scala | 107 +++
.../spark/sql/sedona_sql/io/ImageOptions.scala | 12 +
.../org/apache/sedona/sql/TestBaseScala.scala | 1 +
.../scala/org/apache/sedona/sql/rasterIOTest.scala | 79 ++
.../org/apache/sedona/sql/rasteralgebraTest.scala | 208 +++++
27 files changed, 3244 insertions(+), 12 deletions(-)
diff --git a/.gitignore b/.gitignore
index 4ced234..2eee0df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,4 +12,4 @@
/site/
/.bloop/
/.metals/
-/.vscode/
\ No newline at end of file
+/.vscode/
diff --git a/binder/ApacheSedonaRaster_1.ipynb b/binder/ApacheSedonaRaster_1.ipynb
new file mode 100644
index 0000000..1220a51
--- /dev/null
+++ b/binder/ApacheSedonaRaster_1.ipynb
@@ -0,0 +1,871 @@
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import findspark\n",
+ "findspark.init()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "from IPython.display import display, HTML\n",
+ "from pyspark.sql import SparkSession\n",
+ "from pyspark import StorageLevel\n",
+ "import pandas as pd\n",
+ "from pyspark.sql.types import StructType, StructField,StringType, LongType, IntegerType, DoubleType, ArrayType\n",
+ "from pyspark.sql.functions import regexp_replace\n",
+ "from sedona.register import SedonaRegistrator\n",
+ "from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
+ "from pyspark.sql.functions import col, split, expr\n",
+ "from pyspark.sql.functions import udf, lit\n",
+ "from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
+ "from pyspark.sql.functions import col, split, expr\n",
+ "from pyspark.sql.functions import udf, lit\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Create Spark Session for application"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "spark = SparkSession.\\\n",
+ " builder.\\\n",
+ " master(\"local[*]\").\\\n",
+ " appName(\"Demo-app\").\\\n",
+ " config(\"spark.serializer\", KryoSerializer.getName).\\\n",
+ " config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName) .\\\n",
+ " config(\"spark.executor.cores\", 3) .\\\n",
+ " config(\"spark.driver.memory\", \"4G\") .\\\n",
+ " config(\"spark.kryoserializer.buffer.max.value\", \"4096\") .\\\n",
+ " config(\"spark.sql.crossJoin.enabled\", \"true\") .\\\n",
+ " getOrCreate()\n",
+ "\n",
+ "SedonaRegistrator.registerAll(spark)\n",
+ "sc = spark.sparkContext\n"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Geotiff Loader \n",
+ "\n",
+ "1. Loader takes as input a path to directory which contains geotiff files or a parth to particular geotiff file\n",
+ "2. Loader will read geotiff image in a struct named image which contains multiple fields as shown in the schema below which can be extracted using spark SQL"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "# Path to directory of geotiff images \n",
+ "DATA_DIR = \"./data/raster/\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {
+ "scrolled": true
+ },
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "root\n",
+ " |-- image: struct (nullable = true)\n",
+ " | |-- origin: string (nullable = true)\n",
+ " | |-- Geometry: geometry (nullable = true)\n",
+ " | |-- height: integer (nullable = true)\n",
+ " | |-- width: integer (nullable = true)\n",
+ " | |-- nBands: integer (nullable = true)\n",
+ " | |-- data: array (nullable = true)\n",
+ " | | |-- element: double (containsNull = true)\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = spark.read.format(\"geotiff\").option(\"dropInvalid\",True).load(DATA_DIR)\n",
+ "df.printSchema()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+ "| origin| Geom|height|width| data|bands|\n",
+ "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+ "|file:///home/hp/D...|POLYGON ((-58.702...| 32| 32|[1081.0, 1068.0, ...| 4|\n",
+ "|file:///home/hp/D...|POLYGON ((-58.286...| 32| 32|[1151.0, 1141.0, ...| 4|\n",
+ "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "df = df.selectExpr(\"image.origin as origin\",\"image.Geometry as Geom\", \"image.height as height\", \"image.width as width\", \"image.data as data\", \"image.nBands as bands\")\n",
+ "df.show(5)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Extract a particular band from geotiff dataframe using RS_GetBand()\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "| Geom| Band1| Band2| Band3| Band4|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "|POLYGON ((-58.702...|[1081.0, 1068.0, ...|[865.0, 1084.0, 1...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|\n",
+ "|POLYGON ((-58.286...|[1151.0, 1141.0, ...|[1197.0, 1163.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_GetBand() will fetch a particular band from given data array which is the concatination of all the bands'''\n",
+ "\n",
+ "df = df.selectExpr(\"Geom\",\"RS_GetBand(data, 1,bands) as Band1\",\"RS_GetBand(data, 2,bands) as Band2\",\"RS_GetBand(data, 3,bands) as Band3\", \"RS_GetBand(data, 4,bands) as Band4\")\n",
+ "df.createOrReplaceTempView(\"allbands\")\n",
+ "df.show(5)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Map Algebra operations on band values "
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 8,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| normDiff|\n",
+ "+--------------------+\n",
+ "|[-0.11, 0.01, 0.0...|\n",
+ "|[0.02, 0.01, -0.0...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_NormalizedDifference can be used to calculate NDVI for a particular geotiff image since it uses same computational formula as ndvi'''\n",
+ "\n",
+ "NomalizedDifference = df.selectExpr(\"RS_NormalizedDifference(Band1, Band2) as normDiff\")\n",
+ "NomalizedDifference.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-------+\n",
+ "| mean|\n",
+ "+-------+\n",
+ "|1153.85|\n",
+ "|1293.77|\n",
+ "+-------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_Mean() can used to calculate mean of piel values in a particular spatial band '''\n",
+ "meanDF = df.selectExpr(\"RS_Mean(Band1) as mean\")\n",
+ "meanDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+----------------+\n",
+ "| mode|\n",
+ "+----------------+\n",
+ "| [1011.0, 927.0]|\n",
+ "|[1176.0, 1230.0]|\n",
+ "+----------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "\"\"\" RS_Mode() is used to calculate mode in an array of pixels and returns a array of double with size 1 in case of unique mode\"\"\"\n",
+ "modeDF = df.selectExpr(\"RS_Mode(Band1) as mode\")\n",
+ "modeDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| greaterthan|\n",
+ "+--------------------+\n",
+ "|[1.0, 1.0, 1.0, 0...|\n",
+ "|[1.0, 1.0, 1.0, 1...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_GreaterThan() is used to mask all the values with 1 which are greater than a particular threshold'''\n",
+ "greaterthanDF = spark.sql(\"Select RS_GreaterThan(Band1,1000.0) as greaterthan from allbands\")\n",
+ "greaterthanDF.show()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 15,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| greaterthanEqual|\n",
+ "+--------------------+\n",
+ "|[1.0, 1.0, 1.0, 1...|\n",
+ "|[1.0, 1.0, 1.0, 1...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_GreaterThanEqual() is used to mask all the values with 1 which are greater than a particular threshold'''\n",
+ "\n",
+ "greaterthanEqualDF = spark.sql(\"Select RS_GreaterThanEqual(Band1,360.0) as greaterthanEqual from allbands\")\n",
+ "greaterthanEqualDF.show()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 16,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| lessthan|\n",
+ "+--------------------+\n",
+ "|[0.0, 0.0, 0.0, 1...|\n",
+ "|[0.0, 0.0, 0.0, 0...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_LessThan() is used to mask all the values with 1 which are less than a particular threshold'''\n",
+ "lessthanDF = spark.sql(\"Select RS_LessThan(Band1,1000.0) as lessthan from allbands\")\n",
+ "lessthanDF.show()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 17,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| lessthanequal|\n",
+ "+--------------------+\n",
+ "|[1.0, 1.0, 1.0, 1...|\n",
+ "|[1.0, 1.0, 1.0, 1...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_LessThanEqual() is used to mask all the values with 1 which are less than equal to a particular threshold'''\n",
+ "lessthanEqualDF = spark.sql(\"Select RS_LessThanEqual(Band1,2890.0) as lessthanequal from allbands\")\n",
+ "lessthanEqualDF.show()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 18,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| sumOfBand|\n",
+ "+--------------------+\n",
+ "|[1946.0, 2152.0, ...|\n",
+ "|[2348.0, 2304.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_AddBands() can add two spatial bands together'''\n",
+ "sumDF = df.selectExpr(\"RS_AddBands(Band1, Band2) as sumOfBand\")\n",
+ "sumDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 19,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| diffOfBand|\n",
+ "+--------------------+\n",
+ "|[-216.0, 16.0, 11...|\n",
+ "|[46.0, 22.0, -96....|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_SubtractBands() can subtract two spatial bands together'''\n",
+ "subtractDF = df.selectExpr(\"RS_SubtractBands(Band1, Band2) as diffOfBand\")\n",
+ "subtractDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 20,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| productOfBand|\n",
+ "+--------------------+\n",
+ "|[935065.0, 115771...|\n",
+ "|[1377747.0, 13269...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_MultiplyBands() can multiple two bands together'''\n",
+ "multiplyDF = df.selectExpr(\"RS_MultiplyBands(Band1, Band2) as productOfBand\")\n",
+ "multiplyDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 21,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| divisionOfBand|\n",
+ "+--------------------+\n",
+ "|[1.25, 0.99, 0.9,...|\n",
+ "|[0.96, 0.98, 1.08...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_DivideBands() can divide two bands together'''\n",
+ "divideDF = df.selectExpr(\"RS_DivideBands(Band1, Band2) as divisionOfBand\")\n",
+ "divideDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 22,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| target|\n",
+ "+--------------------+\n",
+ "|[1730.0, 2168.0, ...|\n",
+ "|[2394.0, 2326.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_MultiplyFactor() will multiply a factor to a spatial band'''\n",
+ "mulfacDF = df.selectExpr(\"RS_MultiplyFactor(Band2, 2) as target\")\n",
+ "mulfacDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 23,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| AND|\n",
+ "+--------------------+\n",
+ "|[33.0, 1068.0, 10...|\n",
+ "|[1069.0, 1025.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_BitwiseAND() will return AND between two values of Bands'''\n",
+ "bitwiseAND = df.selectExpr(\"RS_BitwiseAND(Band1, Band2) as AND\")\n",
+ "bitwiseAND.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| OR|\n",
+ "+--------------------+\n",
+ "|[1913.0, 1084.0, ...|\n",
+ "|[1279.0, 1279.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_BitwiseOR() will return OR between two values of Bands'''\n",
+ "bitwiseOR = df.selectExpr(\"RS_BitwiseOR(Band1, Band2) as OR\")\n",
+ "bitwiseOR.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 25,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+-----+\n",
+ "|count|\n",
+ "+-----+\n",
+ "| 753|\n",
+ "| 1017|\n",
+ "+-----+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_Count() will calculate the total number of occurence of a target value'''\n",
+ "countDF = df.selectExpr(\"RS_Count(RS_GreaterThan(Band1,1000.0), 1.0) as count\")\n",
+ "countDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 26,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| modulo|\n",
+ "+--------------------+\n",
+ "|[10.0, 18.0, 18.0...|\n",
+ "|[17.0, 7.0, 2.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_Modulo() will calculate the modulus of band value with respect to a given number'''\n",
+ "moduloDF = df.selectExpr(\"RS_Modulo(Band1, 21.0) as modulo \")\n",
+ "moduloDF.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| root|\n",
+ "+--------------------+\n",
+ "|[32.88, 32.68, 32...|\n",
+ "|[33.93, 33.78, 35...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_SquareRoot() will calculate calculate square root of all the band values upto two decimal places'''\n",
+ "rootDF = df.selectExpr(\"RS_SquareRoot(Band1) as root\")\n",
+ "rootDF.show(5)\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 28,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| loggDifference|\n",
+ "+--------------------+\n",
+ "|[1081.0, 1068.0, ...|\n",
+ "|[1151.0, 1141.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_LogicalDifference() will return value from band1 if value at that particular location is not equal tp band1 else it will return 0'''\n",
+ "logDiff = df.selectExpr(\"RS_LogicalDifference(Band1, Band2) as loggDifference\")\n",
+ "logDiff.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 29,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+\n",
+ "| logicalOver|\n",
+ "+--------------------+\n",
+ "|[865.0, 1084.0, 1...|\n",
+ "|[1197.0, 1163.0, ...|\n",
+ "+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' RS_LogicalOver() will iterate over two bands and return value of first band if it is not equal to 0 else it will return value from later band'''\n",
+ "logOver = df.selectExpr(\"RS_LogicalOver(Band3, Band2) as logicalOver\")\n",
+ "logOver.show(5)"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# Visualising Geotiff Images\n",
+ "\n",
+ "1. Normalize the bands in range [0-255] if values are greater than 255\n",
+ "2. Process image using RS_Base64() which converts in into a base64 string\n",
+ "3. Embedd results of RS_Base64() in RS_HTML() to embedd into IPython notebook\n",
+ "4. Process results of RS_HTML() as below:"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "| Geom| RedBand| BlueBand| GreenBand| CombinedBand|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "|POLYGON ((-58.702...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|\n",
+ "|POLYGON ((-58.286...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|\n",
+ "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' Plotting images as a dataframe using geotiff Dataframe.'''\n",
+ "\n",
+ "df = spark.read.format(\"geotiff\").option(\"dropInvalid\",True).load(DATA_DIR)\n",
+ "df = df.selectExpr(\"image.origin as origin\",\"image.Geometry as Geom\", \"image.height as height\", \"image.width as width\", \"image.data as data\", \"image.nBands as bands\")\n",
+ "\n",
+ "df = df.selectExpr(\"RS_GetBand(data,1,bands) as targetband\", \"height\", \"width\", \"bands\", \"Geom\")\n",
+ "df_base64 = df.selectExpr(\"Geom\", \"RS_Base64(height,width,RS_Normalize(targetBand), RS_Array(height*width,0), RS_Array(height*width, 0)) as red\",\"RS_Base64(height,width,RS_Array(height*width, 0), RS_Normalize(targetBand), RS_Array(height*width, 0)) as green\", \"RS_Base64(height,width,RS_Array(height*width, 0), RS_Array(height*width, 0), RS_Normalize(targetBand)) as blue\",\"RS_Base64(height,width,RS_Normalize(targetBand), RS_Normalize(targetBand),RS_Normalize(targetBand)) as [...]
+ "df_HTML = df_base64.selectExpr(\"Geom\",\"RS_HTML(red) as RedBand\",\"RS_HTML(blue) as BlueBand\",\"RS_HTML(green) as GreenBand\", \"RS_HTML(RGB) as CombinedBand\")\n",
+ "df_HTML.show(5)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>Geom</th>\n",
+ " <th>RedBand</th>\n",
+ " <th>BlueBand</th>\n",
+ " <th>GreenBand</th>\n",
+ " <th>CombinedBand</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>POLYGON ((-58.70271939504448 -34.41877544555479, -58.70277605822864 -34.42156988068061, -58.6994039180242 -34.42161679331493, -58.69934736692278 -34.4188223533111, -58.70271939504448 -34.41877544555479))</td>\n",
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFGElEQVR42l2X2W4jRRSG+zVhEu9b2+6O492xE8eJE8eZZLIxZNaISZgJIAHSSGwSFyziDoRAvABPgMQVQkhDneg70q+5+F3d5aqz/Gep6qgVRW/WAhoBpYA6Y5XnWkCRNb2AecCDgIuA44A44LOAPwM+DeiwZxU5hnxAhTGPbJsvB0Td8DNCuG3eFMWTAPs/hwH7rH0d8DTgVcAQPAm4ClgieAXDDRmQR1aeeTM+Mk8GojTBkHWQsNCepwGLgK2AMc87KP084N+A31ifBRnYyPGeYy7rDMyg3wTdBuyhbChGbDCOWechM3YOAt4L+CHgb8Y6jC6g2z1eQbmPZkw0QKCN3yDQvD5BiC38ACVbGLZLOM4ZzYlDDDWvUgy9YH0snq9iQAl5k [...]
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFHklEQVR42l2X2W4jRRSG/ZowifetbbfttPexY8exsziTTBYzBGaJSMJMAIlBGolN4oJF3IEQiBfgCZC4QgjJVDXf8fyai9/VXa46y3+Wqk4kEs1VItFwqDrkHSqMJZ7LDjnWdBzmDg8dlg4nDoHDS4c/HD51aLFnEzkeGYciYwbZfr7gkGi7nwHC/eZtUTxa/f9/GgP2WfvK4YnDC4c+eOxw5bBA8AaGeyRBBlkZ5r3xsSc9URpiyBYIWeifJw4HDmOHIc+7KP3M4R+HX1mfAknYSPOeZi5lDEyh3wu6c9hDWV+MuM84ZJ2FzLNz6PCOw/cOfzFWYPQAus3jDZTb6I2JvW/AwtcI9F6fIsQv/AAlYwybEY4LRu/EEYZ6r+oYumR9IJ5vY [...]
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFGElEQVR42l2X2W4jRRSG/ZpA4i3euu1uO+09duw4ceI4k0w2hswaMQlDAIlBGolN4oJF3IEQiBfgCZC4QghpqGq+Y/2ai9/VXa46y3+Wqs5kWpnXmaZD3aHkEDJWea45bLCm6zB3uO9w6XDiEDi8dPjD4VOHNnvWkeNRcKgwFpDt58sOmY77GSLcb94WxWMH/38eAw5Y+8rhicOHDgPw2OHaYYngNQz3yIICsgrMe+NTT/qiNMKQTRCx0D9PHRYOE4cRz7so/czhH4dfWZ8DWdjI855nLmcMzKDfC7pz2EfZQIzYYhyxzkLm2Tl0eNfhe4e/GEMYXUC3ebyGchu9Man3TVj4GoHe61OE+IXvo2SCYXuE44LRO3GEod6rGEMvWR+I5+sYU [...]
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAGOElEQVR42k3XWUtXURQF8Ps1s3kwLRvNBm3O5kEtK0srmq3AgqAJemigtyIKv0CfIPBJJDjx27BE4Xj/99xz915r7eGc2+3Zs6ft3r27DQwMtC1btrTt27fXta+vr3739/e3zZs315r9+/e306dPt7GxsTY5OdmuXLnStm3b1l6+fNn+/PnT5ubm2r59++qdtWvXlh1j48aNbevWrXU12Dbf29vbuqGhoTY8PFzGvXz06NFlx0eOHGmeb9iwoQCcPXu21r5+/brNzMy0p0+ftkOHDtWYnp5u9+7daxcuXCjDa9asKeDGunXranDOlqt54DtMDh48uOx0586dBWTv3r013Fvo9/Hjx9u5c+fasWPH2uHDh+v3qVOnyumrV6/a0tJS+/XrV [...]
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>POLYGON ((-58.28663657626114 -34.75858090620287, -58.28667994174946 -34.76137571668496, -58.28329340123002 -34.76141146033393, -58.28325014980316 -34.75861664615162, -58.28663657626114 -34.75858090620287))</td>\n",
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAE+UlEQVR42l2XS4/bVBTH/R0p08nDieOx4zhO7LxfM5lH0xnoY9qhtKgMagEJVd0hIbaw4FuwR2zY8QXKvdHvoL+yOLF977nn/M/7JoiD4FPHUelo4Shx9NBR31HlqO3I8zQcHTtqOooctXgv2D9iP2Yt4sxDqIGsnaO/HL1ylDsKUgCkgMgR3GYtFiFeQR3lkfCMHPXg9We7Yog/EwIgRMczzpx4AP5n4miA1TnUEvKHP3P0AEsNkFc0dnTr6AVgauyF4oUO3976P7De8w49gDkAEhDFAEhBn/B+JACOAeQ9cO7oFMtCeI4B0YLfQNw7+hUA3uAbDUEf97UAMCGWU/bash/KM5LnhLMNqH4QzgGhWiDXey2owdjHpRFCvGc2vO842IGar [...]
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAE/ElEQVR42l2XyXLcVBSG+x0JjntQS62WWq0epJ4nu2M7HZuExCEJQwVTCVBFUeyootjCgrdgT7FhxwuYe5XvmL+8OC3p3jP8Z7y3a7VafFurdRwVjlaOEkcPHQ0clY7C2w88TUfHjlqOIkdt3ofsH7EfsxYh8xBqouvg6C9HbxzljmopAFJA5CgOWYtFiTfQwHgkPBNHfXi9bE8c8TIBAAJsPEem6wH4n5mjEV7nUFvIC3/k6AGeGiBvaOro2tFLwNTZCyQKHb6993/gvecdewBLACQgigGQgj7h/UgAHAPIR+CRoxM8C+A5BkQbfgNx4+hXAHiHrzQFA8LXBsCMXM7ZC2U/kGckzxmyTahxL50jUrVCr49aFbIGRnoo8kp8ZHa8HxDsQ [...]
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAE+ElEQVR42l2XyW7bVhSG9Y5NHWugSFGkKGogNU+2YjuK3aSJ3SQdkLpomgBFkF2Bott20bfovuimu75Aei/xHeOHF0ck7z3Df8Z7VavFtU+1jqPC0cpR4uiho4Gj0lHoyPM0HR07ajmKHLV5H7J/xH7MWoTMQ6iJroOjvx29dpQ7qqUASAGRozhkLRYl3kAD45HwTBz14fWyPXHEywQACLDxHJmuB+B/Zo5GeJ1DbSEv/JmjB3hqgLyhqaNrRzeAqbMXSBQ6fHvv/8R7zzv2AJYASEAUAyAFfcL7kQA4BpCPwCNHJ3gWwHMMiDb8BuLW0W8A8A5faQoGhK8NgBm5nLMXyn4gz0ieM2SbUONeOkekaoVeH7UqZA2M9FDklfjI7Hg/INiBW [...]
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAF7klEQVR42k3XyU5WSxQF4POOKCooSmODqKggNoCCiA2tghqUAJIY44zEMNWBb8HcOGHmC9TNt5PFhaSoU1W7Wbut+ruhoaE2ODjY7ty506amptrIyEg7d+5cu3nzZhsfH2+XL19uaPr7+9v58+fbxYsX25UrV9rAwEB9j42N1Xlvb2+d+7aHBg9Zhm+yFhYW2p8/f9rGxkYbHR1t3dWrVwuAGQibBCO2R2CEUNDX11fKKQjN3bt3240bN4oW77Vr104MwXPp0qUCYKZjeXm5eIaHh1vn3/3799utW7fKagAMSjIw9/T0tDNnzpSlAUTRvXv32urqaltbWyswFy5cqDPK4gUGWrP+9+/fZT3a27dvt+7BgwcFAGJgWAEAAujt+6Y4AOwDx [...]
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "display(HTML(df_HTML.limit(2).toPandas().to_html(escape=False)))"
+ ]
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {},
+ "source": [
+ "# User can also create some UDF manually to manipulate Geotiff dataframes"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
+ "+------+\n",
+ "| sum|\n",
+ "+------+\n",
+ "| 753.0|\n",
+ "|1017.0|\n",
+ "+------+\n",
+ "\n"
+ ]
+ }
+ ],
+ "source": [
+ "''' Sample UDF calculates sum of all the values in a band which are greater than 1000.0 '''\n",
+ "\n",
+ "def SumOfValues(band):\n",
+ " total = 0.0\n",
+ " for num in band:\n",
+ " if num>1000.0:\n",
+ " total+=1\n",
+ " return total\n",
+ " \n",
+ "calculateSum = udf(SumOfValues, DoubleType())\n",
+ "spark.udf.register(\"RS_Sum\", calculateSum)\n",
+ "\n",
+ "sumDF = df.selectExpr(\"RS_Sum(targetband) as sum\")\n",
+ "sumDF.show()"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<table border=\"1\" class=\"dataframe\">\n",
+ " <thead>\n",
+ " <tr style=\"text-align: right;\">\n",
+ " <th></th>\n",
+ " <th>Geom</th>\n",
+ " <th>selectedregion</th>\n",
+ " </tr>\n",
+ " </thead>\n",
+ " <tbody>\n",
+ " <tr>\n",
+ " <th>0</th>\n",
+ " <td>POLYGON ((-58.70271939504448 -34.41877544555479, -58.70277605822864 -34.42156988068061, -58.6994039180242 -34.42161679331493, -58.69934736692278 -34.4188223533111, -58.70271939504448 -34.41877544555479))</td>\n",
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFlklEQVR42l2Xa28bRRSG529CXV/XG6/t9S1rx/c4iZPGaS6N3RBK2oiGSwCJVqpEAYkPBcQ3EALxB/gFSHxCCGk5M/OMM+qH1zO73p1zznvec2ZWtZRSTUFNEAiqjBvMK4ISz3QFc6XSU8FKcCKIBC8Efwq+FGwK5J30viAABUEo7xbABjbKApUIBiwuL6uJZ3jM/3kcOLDPpq8EV4JPBX3wgeBasBDIwmlGUAJZC2M8z6iDirQDEona8ozGONIGMQ/q+bbggSw2FYwEer6L0ZeCfwW/wUoOZGFDG87hQJa5YWAH+mUhdSvYx1jfc2LIOLLPpcJGKu+kwk56KHgs+EHwN6MEk3ZxcIMU6IgzGHejdsZE32T8VnBI1GekRT/4oUCMqKl1L [...]
+ " </tr>\n",
+ " <tr>\n",
+ " <th>1</th>\n",
+ " <td>POLYGON ((-58.28663657626114 -34.75858090620287, -58.28667994174946 -34.76137571668496, -58.28329340123002 -34.76141146033393, -58.28325014980316 -34.75861664615162, -58.28663657626114 -34.75858090620287))</td>\n",
+ " <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFfklEQVR42k2XSXPbRhCF8R/tSOJOQiRBECTBTaRILdZiybJjKbacpGynvKQqlcotVSlfk0P+Re6pXHLLH0C6e74R5vAEgFj69evXPaMojqKoI5gIVoKuYFcwFOSClkCfqQn2BPUoKtqCpkDPM4HcL3YEe5xnPCPvFLugxrcuBX8L7gWpIOpBoAeJ1AWxh3sE3wVKoEpwDSDPFPJMMRUMCK6k+oIugZVUAwINYnwtkHei/Yg/c8GIrFPQDKAvPxI8FuwEWWmgmeBWcAeZCvcagQodd23Z/0n2mtxYCRxAoAuZGAI9Mu5yvhMQ0KweocCp4EgwIehj7u+i1E5JInov+AIBTfg6LMEQ6ZsQUFJSy2jBvVZ53wI1gyz9Ud4pUgIqqpSEUlnQA [...]
+ " </tr>\n",
+ " </tbody>\n",
+ "</table>"
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "''' Sample UDF to visualize a particular region of a GeoTiff image'''\n",
+ "\n",
+ "def generatemask(band, width,height):\n",
+ " for (i,val) in enumerate(band):\n",
+ " if (i%width>=12 and i%width<26) and (i%height>=12 and i%height<26):\n",
+ " band[i] = 255.0\n",
+ " else:\n",
+ " band[i] = 0.0\n",
+ " return band\n",
+ "\n",
+ "maskValues = udf(generatemask, ArrayType(DoubleType()))\n",
+ "spark.udf.register(\"RS_MaskValues\", maskValues)\n",
+ "\n",
+ "\n",
+ "df_base64 = df.selectExpr(\"Geom\", \"RS_Base64(height,width,RS_Normalize(targetband), RS_Array(height*width,0), RS_Array(height*width, 0), RS_MaskValues(targetband,width,height)) as region\" )\n",
+ "df_HTML = df_base64.selectExpr(\"Geom\",\"RS_HTML(region) as selectedregion\")\n",
+ "display(HTML(df_HTML.limit(2).toPandas().to_html(escape=False)))\n"
+ ]
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "apache-sedona",
+ "language": "python",
+ "name": "apache-sedona"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.7.5"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/binder/data/raster/T21HUB_4704_4736_8224_8256.tif b/binder/data/raster/T21HUB_4704_4736_8224_8256.tif
new file mode 100644
index 0000000..776d753
Binary files /dev/null and b/binder/data/raster/T21HUB_4704_4736_8224_8256.tif differ
diff --git a/binder/data/raster/vya_T21HUB_992_1024_4352_4384.tif b/binder/data/raster/vya_T21HUB_992_1024_4352_4384.tif
new file mode 100644
index 0000000..6df2b0a
Binary files /dev/null and b/binder/data/raster/vya_T21HUB_992_1024_4352_4384.tif differ
diff --git a/core/src/test/resources/raster/test1.tiff b/core/src/test/resources/raster/test1.tiff
new file mode 100644
index 0000000..bebd682
Binary files /dev/null and b/core/src/test/resources/raster/test1.tiff differ
diff --git a/core/src/test/resources/raster/test2.tiff b/core/src/test/resources/raster/test2.tiff
new file mode 100644
index 0000000..bebd682
Binary files /dev/null and b/core/src/test/resources/raster/test2.tiff differ
diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index d456d1f..9544488 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -184,4 +184,4 @@ Spark SQL example:
SELECT *
FROM pointdf
WHERE ST_Contains(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.pointshape)
-```
+```
\ No newline at end of file
diff --git a/docs/api/sql/Raster-loader.md b/docs/api/sql/Raster-loader.md
new file mode 100644
index 0000000..f4197b1
--- /dev/null
+++ b/docs/api/sql/Raster-loader.md
@@ -0,0 +1,144 @@
+### Geotiff Dataframe Loader
+
+Introduction: The GeoTiff loader of Sedona is a Spark built-in data source. It can read a single geotiff image or
+a number of geotiff images into a DataFrame.
+
+Since: `v1.1.0`
+
+Spark SQL example:
+
+The input path could be a path to a single GeoTiff image or a directory of GeoTiff images.
+ You can optionally append an option to drop invalid images. The geometry bound of each image is automatically loaded
+as a Sedona geometry and is transformed to WGS84 (EPSG:4326) reference system.
+
+```Scala
+var geotiffDF = sparkSession.read.format("geotiff").option("dropInvalid", true).load("YOUR_PATH")
+geotiffDF.printSchema()
+```
+
+Output:
+
+```html
+ |-- image: struct (nullable = true)
+ | |-- origin: string (nullable = true)
+ | |-- Geometry: geometry (nullable = true)
+ | |-- height: integer (nullable = true)
+ | |-- width: integer (nullable = true)
+ | |-- nBands: integer (nullable = true)
+ | |-- data: array (nullable = true)
+ | | |-- element: double (containsNull = true)
+```
+
+You can also select sub-attributes individually to construct a new DataFrame
+
+```Scala
+geotiffDF = geotiffDF.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+geotiffDF.createOrReplaceTempView("GeotiffDataframe")
+geotiffDF.show()
+```
+
+Output:
+
+```html
++--------------------+--------------------+------+-----+--------------------+-----+
+| origin| Geom|height|width| data|bands|
++--------------------+--------------------+------+-----+--------------------+-----+
+|file:///home/hp/D...|POLYGON ((-58.699...| 32| 32|[1058.0, 1039.0, ...| 4|
+|file:///home/hp/D...|POLYGON ((-58.297...| 32| 32|[1258.0, 1298.0, ...| 4|
++--------------------+--------------------+------+-----+--------------------+-----+
+```
+
+## RS_GetBand
+
+Introduction: Return a particular band from Geotiff Dataframe
+
+The number of total bands can be obtained from the GeoTiff loader
+
+Format: `RS_GetBand (allBandValues: Array[Double], targetBand:Int, totalBands:Int)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+
+```Scala
+val BandDF = spark.sql("select RS_GetBand(data, 2, Band) as targetBand from GeotiffDataframe")
+BandDF.show()
+```
+
+Output:
+
+```html
++--------------------+
+| targetBand|
++--------------------+
+|[1058.0, 1039.0, ...|
+|[1258.0, 1298.0, ...|
++--------------------+
+```
+
+## RS_Array
+
+Introduction: Create an array that is filled by the given value
+
+Format: `RS_Array(length:Int, value: Decimal)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+
+```Scala
+SELECT RS_Array(height * width, 0.0)
+```
+
+## RS_Base64
+
+Introduction: Return a Base64 String from a geotiff image
+
+Format: `RS_Base64 (height:Int, width:Int, redBand: Array[Double], greenBand: Array[Double], blackBand: Array[Double],
+optional: alphaBand: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+val BandDF = spark.sql("select RS_Base64(h, w, band1, band2, RS_Array(h*w, 0)) as baseString from dataframe")
+BandDF.show()
+```
+
+Output:
+
+```html
++--------------------+
+| baseString|
++--------------------+
+|QJCIAAAAAABAkDwAA...|
+|QJOoAAAAAABAlEgAA...|
++--------------------+
+```
+
+!!!note
+ Although the 3 RGB bands are mandatory, you can use [RS_Array(h*w, 0.0)](#rs_array) to create an array (zeroed out, size = h * w) as input.
+
+## RS_HTML
+
+Introduction: Return a html img tag with the base64 string embedded
+
+Format: `RS_HTML(base64:String, optional: width_in_px:String)`
+
+Spark SQL example:
+
+```Scala
+df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" ).show()
+```
+
+Output:
+
+```html
++--------------------+
+| htmlstring|
++--------------------+
+|<img src="data:im...|
+|<img src="data:im...|
++--------------------+
+```
+
diff --git a/docs/api/sql/Raster-operators.md b/docs/api/sql/Raster-operators.md
new file mode 100644
index 0000000..a401ba0
--- /dev/null
+++ b/docs/api/sql/Raster-operators.md
@@ -0,0 +1,309 @@
+## RS_AddBand
+
+Introduction: Add two spectral bands in a Geotiff image
+
+Format: `RS_AddBand (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val sumDF = spark.sql("select RS_Add(band1, band2) as sumOfBands from dataframe")
+
+```
+
+## RS_SubtractBands
+
+Introduction: Subtract two spectral bands in a Geotiff image(band2 - band1)
+
+Format: `RS_SubtractBands (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val subtractDF = spark.sql("select RS_Subtract(band1, band2) as differenceOfOfBands from dataframe")
+
+```
+
+## RS_MultiplyBands
+
+Introduction: Multiply two spectral bands in a Geotiff image
+
+Format: `RS_MultiplyBands (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val multiplyDF = spark.sql("select RS_MultiplyBands(band1, band2) as multiplyBands from dataframe")
+
+```
+
+## RS_DivideBands
+
+Introduction: Divide band1 with band2 from a geotiff image
+
+Format: `RS_DivideBands (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val multiplyDF = spark.sql("select RS_DivideBands(band1, band2) as divideBands from dataframe")
+
+```
+
+## RS_MultiplyFactor
+
+Introduction: Multiply a factor to a spectral band in a geotiff image
+
+Format: `RS_MultiplyFactor (Band1: Array[Double], Factor: Int)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val multiplyFactorDF = spark.sql("select RS_MultiplyFactor(band1, 2) as multiplyfactor from dataframe")
+
+```
+
+## RS_Mode
+
+Introduction: Returns Mode from a spectral band in a Geotiff image in form of an array
+
+Format: `RS_Mode (Band: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val modeDF = spark.sql("select RS_Mode(band) as mode from dataframe")
+
+```
+
+## RS_Mean
+
+Introduction: Returns Mean value for a spectral band in a Geotiff image
+
+Format: `RS_Mean (Band: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val meanDF = spark.sql("select RS_Mean(band) as mean from dataframe")
+
+```
+
+## RS_NormalizedDifference
+
+Introduction: Returns Normalized Difference between two bands(band2 and band1) in a Geotiff image(example: NDVI, NDBI)
+
+Format: `RS_NormalizedDifference (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val normalizedDF = spark.sql("select RS_NormalizedDifference(band1, band2) as normdifference from dataframe")
+
+```
+
+## RS_Count
+
+Introduction: Returns count of a particular value from a spectral band in a raster image
+
+Format: `RS_Count (Band1: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val countDF = spark.sql("select RS_Count(band1, target) as count from dataframe")
+
+```
+
+## RS_GreaterThan
+
+Introduction: Mask all the values with 1 which are greater than a particular target value
+
+Format: `RS_GreaterThan (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val greaterDF = spark.sql("select RS_GreaterThan(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_GreaterThanEqual
+
+Introduction: Mask all the values with 1 which are greater than equal to a particular target value
+
+Format: `RS_GreaterThanEqual (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val greaterEqualDF = spark.sql("select RS_GreaterThanEqual(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_LessThan
+
+Introduction: Mask all the values with 1 which are less than a particular target value
+
+Format: `RS_LessThan (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val lessDF = spark.sql("select RS_LessThan(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_LessThanEqual
+
+Introduction: Mask all the values with 1 which are less than equal to a particular target value
+
+Format: `RS_LessThanEqual (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val lessEqualDF = spark.sql("select RS_LessThanEqual(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_Modulo
+
+Introduction: Find modulo of pixels with respect to a particular value
+
+Format: `RS_Modulo (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val moduloDF = spark.sql("select RS_Modulo(band, target) as modulo from dataframe")
+
+```
+
+## RS_BitwiseAND
+
+Introduction: Find Bitwise AND between two bands of Geotiff image
+
+Format: `RS_BitwiseAND (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val biwiseandDF = spark.sql("select RS_BitwiseAND(band1, band2) as andvalue from dataframe")
+
+```
+
+## RS_BitwiseOR
+
+Introduction: Find Bitwise OR between two bands of Geotiff image
+
+Format: `RS_BitwiseOR (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val biwiseorDF = spark.sql("select RS_BitwiseOR(band1, band2) as or from dataframe")
+
+```
+
+## RS_SquareRoot
+
+Introduction: Find Square root of band values in a geotiff image
+
+Format: `RS_SquareRoot (Band: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val rootDF = spark.sql("select RS_SquareRoot(band) as squareroot from dataframe")
+
+```
+
+## RS_LogicalDifference
+
+Introduction: Return value from band 1 if a value in band1 and band2 are different, else return 0
+
+Format: `RS_LogicalDifference (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val logicalDifference = spark.sql("select RS_LogicalDifference(band1, band2) as logdifference from dataframe")
+
+```
+
+## RS_LogicalOver
+
+Introduction: Return value from band1 if it's not equal to 0, else return band2 value
+
+Format: `RS_LogicalOver (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val logicalOver = spark.sql("select RS_LogicalOver(band1, band2) as logover from dataframe")
+
+```
+
+## RS_FetchRegion
+
+Introduction: Fetch a subset of region from given Geotiff image based on minimumX, minimumY, maximumX and maximumY index as well original height and width of image
+
+Format: `RS_FetchRegion (Band: Array[Double], coordinates: Array[Int], dimenstions: Array[Int])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val region = spark.sql("select RS_FetchRegion(Band,Array(0, 0, 1, 2),Array(3, 3)) as Region from dataframe")
+```
+
+## RS_Normalize
+
+Introduction: Normalize the value in the array to [0, 255]
+
+Since: `v1.1.0`
+
+Spark SQL example
+```SQL
+SELECT RS_Normalize(band)
+```
\ No newline at end of file
diff --git a/docs/tutorial/raster.md b/docs/tutorial/raster.md
new file mode 100644
index 0000000..ca7a346
--- /dev/null
+++ b/docs/tutorial/raster.md
@@ -0,0 +1,19 @@
+# Raster data processing
+
+Starting from `v1.1.0`, Sedona SQL supports raster data sources and raster operators in DataFrame and SQL. Raster support is available in all Sedona language bindings including ==Scala, Java, Python and R==.
+
+## Initial setup
+
+1. [Set up dependencies](../sql/#set-up-dependencies)
+2. [Initiate Spark session](../sql/#initiate-sparksession)
+3. [Register SedonaSQL](../sql/#register-sedonasql)
+
+## API docs
+
+[IO of raster data in DataFrame](../../api/sql/Raster-loader/)
+
+[Map algebra in DataFrame](../../api/sql/Raster-operators/)
+
+## Tutorials
+
+[Python Jupyter Notebook](https://github.com/apache/incubator-sedona/blob/master/binder/ApacheSedonaRaster_1.ipynb)
diff --git a/mkdocs.yml b/mkdocs.yml
index 38d9df8..4e1f76f 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -22,6 +22,7 @@ nav:
- Scala/Java: tutorial/sql.md
- Pure SQL: tutorial/sql-pure-sql.md
- Python: tutorial/sql-python.md
+ - Raster data: tutorial/raster.md
- Map visualization SQL app:
- Scala/Java: tutorial/viz.md
- Use Apache Zeppelin: tutorial/zeppelin.md
@@ -37,11 +38,15 @@ nav:
- Python doc: api/python-api.md
- SQL:
- Quick start: api/sql/Overview.md
- - Constructor: api/sql/Constructor.md
- - Function: api/sql/Function.md
- - Predicate: api/sql/Predicate.md
- - Aggregate function: api/sql/AggregateFunction.md
- - Join query (optimizer): api/sql/Optimizer.md
+ - Vector data:
+ - Constructor: api/sql/Constructor.md
+ - Function: api/sql/Function.md
+ - Predicate: api/sql/Predicate.md
+ - Aggregate function: api/sql/AggregateFunction.md
+ - Join query (optimizer): api/sql/Optimizer.md
+ - Raster data:
+ - Raster input and output: api/sql/Raster-loader.md
+ - Raster operators: api/sql/Raster-operators.md
- Parameter: api/sql/Parameter.md
- Viz:
- DataFrame/SQL: api/viz/sql.md
diff --git a/pom.xml b/pom.xml
index c721318..d1f5e27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,6 +159,19 @@
</exclusion>
</exclusions>
</dependency>
+ <!--for GeoTiff Reader-->
+ <dependency>
+ <groupId>org.geotools</groupId>
+ <artifactId>gt-geotiff</artifactId>
+ <version>${geotools.version}</version>
+ <scope>${dependency.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.geotools</groupId>
+ <artifactId>gt-coverage</artifactId>
+ <version>${geotools.version}</version>
+ <scope>${dependency.scope}</scope>
+ </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.compat.version}</artifactId>
diff --git a/python-adapter/.gitignore b/python-adapter/.gitignore
index 804f11a..67d8d31 100644
--- a/python-adapter/.gitignore
+++ b/python-adapter/.gitignore
@@ -2,4 +2,5 @@
bin
/.settings
/.classpath
-/.project
\ No newline at end of file
+/.project
+*.iml
\ No newline at end of file
diff --git a/python-adapter/pom.xml b/python-adapter/pom.xml
index 9844754..4b898c9 100644
--- a/python-adapter/pom.xml
+++ b/python-adapter/pom.xml
@@ -87,6 +87,19 @@
</exclusion>
</exclusions>
</dependency>
+ <!--for GeoTiff Reader-->
+ <dependency>
+ <groupId>org.geotools</groupId>
+ <artifactId>gt-geotiff</artifactId>
+ <version>${geotools.version}</version>
+ <scope>${geotools.scope}</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.geotools</groupId>
+ <artifactId>gt-coverage</artifactId>
+ <version>${geotools.version}</version>
+ <scope>${geotools.scope}</scope>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
diff --git a/spark-version-converter.py b/spark-version-converter.py
index c04ffe2..ac6fb41 100644
--- a/spark-version-converter.py
+++ b/spark-version-converter.py
@@ -24,7 +24,8 @@ spark3_anchor = 'SPARK3 anchor'
files = ['sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala',
'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala',
'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala',
- 'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala']
+ 'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala',
+ 'sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala']
def switch_version(line):
if line[:2] == '//':
diff --git a/sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..6592fb0
--- /dev/null
+++ b/sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index c8b6fdb..5d0b052 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -21,10 +21,12 @@ package org.apache.sedona.sql.UDF
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}
import org.apache.spark.sql.sedona_sql.expressions._
+import org.apache.spark.sql.sedona_sql.expressions.raster.{RS_AddBands, RS_Array, RS_Base64, RS_BitwiseAnd, RS_BitwiseOr, RS_Count, RS_DivideBands, RS_FetchRegion, RS_GetBand, RS_GreaterThan, RS_GreaterThanEqual, RS_HTML, RS_LessThan, RS_LessThanEqual, RS_LogicalDifference, RS_LogicalOver, RS_Mean, RS_Mode, RS_Modulo, RS_MultiplyBands, RS_MultiplyFactor, RS_Normalize, RS_NormalizedDifference, RS_SquareRoot, RS_SubtractBands}
import org.locationtech.jts.geom.Geometry
object Catalog {
val expressions: Seq[FunctionBuilder] = Seq(
+ // Expression for vectors
ST_PointFromText,
ST_PolygonFromText,
ST_LineStringFromText,
@@ -83,7 +85,34 @@ object Catalog {
ST_LineSubstring,
ST_LineInterpolatePoint,
ST_SubDivideExplode,
- ST_SubDivide
+ ST_SubDivide,
+
+ // Expression for rasters
+ RS_NormalizedDifference,
+ RS_Mean,
+ RS_Mode,
+ RS_FetchRegion,
+ RS_GreaterThan,
+ RS_GreaterThanEqual,
+ RS_LessThan,
+ RS_LessThanEqual,
+ RS_AddBands,
+ RS_SubtractBands,
+ RS_DivideBands,
+ RS_MultiplyFactor,
+ RS_MultiplyBands,
+ RS_BitwiseAnd,
+ RS_BitwiseOr,
+ RS_Count,
+ RS_Modulo,
+ RS_GetBand,
+ RS_SquareRoot,
+ RS_LogicalDifference,
+ RS_LogicalOver,
+ RS_Base64,
+ RS_HTML,
+ RS_Array,
+ RS_Normalize
)
val aggregateExpressions: Seq[Aggregator[Geometry, Geometry, Geometry]] = Seq(
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index bbe8205..d9e60e7 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -1125,7 +1125,6 @@ case class ST_FlipCoordinates(inputExpressions: Seq[Expression])
override def children: Seq[Expression] = inputExpressions
}
-
case class ST_SubDivide(inputExpressions: Seq[Expression])
extends Expression with CodegenFallback {
override def nullable: Boolean = true
@@ -1167,4 +1166,4 @@ case class ST_SubDivideExplode(children: Seq[Expression]) extends Generator {
}
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev
-}
\ No newline at end of file
+}
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
new file mode 100644
index 0000000..760d627
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.expressions.raster
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.sedona_sql.expressions.UserDataGeneratator
+import org.apache.spark.sql.types._
+
+
+
+/// Calculate Normalized Difference between two bands
+case class RS_NormalizedDifference(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val ndvi = normalizeddifference(band1, band2)
+
+ new GenericArrayData(ndvi)
+ }
+ private def normalizeddifference(band1: Array[Double], band2: Array[Double]): Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for (i <- 0 until band1.length) {
+ if (band1(i) == 0) {
+ band1(i) = -1
+ }
+ if (band2(i) == 0) {
+ band2(i) = -1
+ }
+
+ result(i) = ((band2(i) - band1(i)) / (band2(i) + band1(i))*100).round/100.toDouble
+ }
+
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Calculate mean value for a particular band
+case class RS_Mean(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 1)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val mean = calculateMean(band)
+ mean
+ }
+
+ private def calculateMean(band:Array[Double]):Double = {
+
+ ((band.toList.sum/band.length)*100).round/100.toDouble
+ }
+
+
+ override def dataType: DataType = DoubleType
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Calculate mode of a particular band
+case class RS_Mode(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 1)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val mode = calculateMode(band)
+ new GenericArrayData(mode)
+ }
+
+ private def calculateMode(band:Array[Double]):Array[Double] = {
+
+ val grouped = band.toList.groupBy(x => x).mapValues(_.size)
+ val modeValue = grouped.maxBy(_._2)._2
+ val modes = grouped.filter(_._2 == modeValue).map(_._1)
+ modes.toArray
+
+ }
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// fetch a particular region from a raster image given particular indexes(Array[minx...maxX][minY...maxY])
+case class RS_FetchRegion(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 3)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val coordinates = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toIntArray()
+ val dim = inputExpressions(2).eval(inputRow).asInstanceOf[GenericArrayData].toIntArray()
+ new GenericArrayData(regionEnclosed(band, coordinates,dim))
+
+ }
+
+ private def regionEnclosed(Band: Array[Double], coordinates: Array[Int], dim: Array[Int]):Array[Double] = {
+
+ val result1D = new Array[Double]((coordinates(2) - coordinates(0) + 1) * (coordinates(3) - coordinates(1) + 1))
+
+ var k = 0
+ for(i<-coordinates(0) until coordinates(2) + 1) {
+ for(j<-coordinates(1) until coordinates(3) + 1) {
+ result1D(k) = Band(((i - 0) * dim(0)) + j)
+ k+=1
+ }
+ }
+ result1D
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are greater than a particular threshold
+case class RS_GreaterThan(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+ new GenericArrayData(findGreaterThan(band, target))
+
+ }
+
+ private def findGreaterThan(band: Array[Double], target: Double):Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+ if(band(i)>target) {
+ result(i) = 1
+ }
+ else {
+ result(i) = 0
+ }
+ }
+ result
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are greater than or equal to a particular threshold
+case class RS_GreaterThanEqual(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+ new GenericArrayData(findGreaterThanEqual(band, target))
+
+ }
+
+ private def findGreaterThanEqual(band: Array[Double], target: Double):Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+ if(band(i)>=target) {
+ result(i) = 1
+ }
+ else {
+ result(i) = 0
+ }
+ }
+ result
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are less than a particular threshold
+case class RS_LessThan(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+ new GenericArrayData(findLessThan(band, target))
+
+ }
+
+ private def findLessThan(band: Array[Double], target: Double):Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+ if(band(i)<target) {
+ result(i) = 1
+ }
+ else {
+ result(i) = 0
+ }
+ }
+ result
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are less than or equal to a particular threshold
+case class RS_LessThanEqual(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+ new GenericArrayData(findLessThanEqual(band, target))
+
+ }
+
+ private def findLessThanEqual(band: Array[Double], target: Double):Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+ if(band(i)<=target) {
+ result(i) = 1
+ }
+ else {
+ result(i) = 0
+ }
+ }
+ result
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Count number of occurences of a particular value in a band
+case class RS_Count(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+
+ val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+ findCount(band, target)
+
+ }
+
+ private def findCount(band: Array[Double], target: Double):Int = {
+
+ var result = 0
+ for(i<-0 until band.length) {
+ if(band(i)==target) {
+ result+=1
+ }
+
+ }
+ result
+ }
+
+ override def dataType: DataType = IntegerType
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Multiply a factor to all values of a band
+case class RS_MultiplyFactor(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val target = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+ new GenericArrayData(multiply(band, target))
+
+ }
+
+ private def multiply(band: Array[Double], target: Int):Array[Double] = {
+
+ var result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+
+ result(i) = band(i)*target
+
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Add two bands
+case class RS_AddBands(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(addBands(band1, band2))
+
+ }
+
+ private def addBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ result(i) = band1(i) + band2(i)
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Subtract two bands
+case class RS_SubtractBands(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(subtractBands(band1, band2))
+
+ }
+
+ private def subtractBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ result(i) = band2(i) - band1(i)
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Multiple two bands
+case class RS_MultiplyBands(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(multiplyBands(band1, band2))
+
+ }
+
+ private def multiplyBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ result(i) = band1(i) * band2(i)
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Divide two bands
+case class RS_DivideBands(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(divideBands(band1, band2))
+
+ }
+
+ private def divideBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ result(i) = ((band1(i)/band2(i))*100).round/(100.toDouble)
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Modulo of a band
+case class RS_Modulo(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ val dividend = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+
+
+ new GenericArrayData(modulo(band, dividend))
+
+ }
+
+ private def modulo(band: Array[Double], dividend:Double):Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+ result(i) = band(i) % dividend
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Square root of values in a band
+case class RS_SquareRoot(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 1)
+ var band:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ new GenericArrayData(squareRoot(band))
+
+ }
+
+ private def squareRoot(band: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ for(i<-0 until band.length) {
+ result(i) = (Math.sqrt(band(i))*100).round/100.toDouble
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Bitwise AND between two bands
+case class RS_BitwiseAnd(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(bitwiseAnd(band1, band2))
+
+ }
+
+ private def bitwiseAnd(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ result(i) = band1(i).toInt & band2(i).toInt
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// Bitwise OR between two bands
+case class RS_BitwiseOr(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(bitwiseOr(band1, band2))
+
+ }
+
+ private def bitwiseOr(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ result(i) = band1(i).toInt | band2(i).toInt
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// if a value in band1 and band2 are different,value from band1 ins returned else return 0
+case class RS_LogicalDifference(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(logicalDifference(band1, band2))
+
+ }
+
+ private def logicalDifference(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ if(band1(i) != band2(i))
+ {
+ result(i) = band1(i)
+ }
+ else
+ {
+ result(i) = 0.0
+ }
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+// If a value in band 1 is not equal to 0, band1 is returned else value from band2 is returned
+case class RS_LogicalOver(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ var band1:Array[Double] = null
+ var band2:Array[Double] = null
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+ inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+ ) {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ assert(band1.length == band2.length)
+
+ new GenericArrayData(logicalOver(band1, band2))
+
+ }
+
+ private def logicalOver(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+ val result = new Array[Double](band1.length)
+ for(i<-0 until band1.length) {
+ if(band1(i) != 0.0)
+ {
+ result(i) = band1(i)
+ }
+ else
+ {
+ result(i) = band2(i)
+ }
+ }
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_Normalize(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ var band:Array[Double] = null
+
+ if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band =inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band =inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+
+ }
+ val result = normalize(band)
+ new GenericArrayData(result)
+ }
+
+ // Normalize between 0 and 255
+ private def normalize(band: Array[Double]): Array[Double] = {
+
+ val result = new Array[Double](band.length)
+ val maxVal = band.toList.max
+
+ for(i<-0 until band.length) {
+ result(i) = (band(i)/(maxVal/255.0)).toInt
+ }
+
+ result
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
new file mode 100644
index 0000000..a725ad6
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.expressions.raster
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.sedona_sql.expressions.UserDataGeneratator
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.geotools.coverage.grid.io.GridFormatFinder
+import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D}
+import org.geotools.geometry.jts.JTS
+import org.geotools.referencing.CRS
+import org.geotools.util.factory.Hints
+import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
+import org.opengis.coverage.grid.GridEnvelope
+import org.opengis.referencing.crs.CoordinateReferenceSystem
+import org.opengis.referencing.operation.MathTransform
+
+import java.awt.Color
+import java.awt.image.BufferedImage
+import java.io.{ByteArrayOutputStream, IOException}
+import java.util.Base64
+import javax.imageio.ImageIO
+
+class GeometryOperations {
+
+ var coverage:GridCoverage2D = null
+ var source:CoordinateReferenceSystem = null
+ var target:CoordinateReferenceSystem = null
+ var targetCRS:MathTransform = null
+
+ def getDimensions(url:String):GridEnvelope = {
+ val format = GridFormatFinder.findFormat(url)
+ val hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
+ val reader = format.getReader(url, hints)
+
+
+ try coverage = reader.read(null)
+ catch {
+ case giveUp: IOException =>
+ throw new RuntimeException(giveUp)
+ }
+ reader.dispose()
+ source = coverage.getCoordinateReferenceSystem
+ target = CRS.decode("EPSG:4326", true)
+ targetCRS = CRS.findMathTransform(source, target)
+ val gridRange2D = coverage.getGridGeometry.getGridRange
+ gridRange2D
+
+ }
+ def readGeometry(url: String): Geometry = {
+ val gridRange2D = getDimensions(url)
+ val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getLow(1)))
+ val polyCoordinates = new Array[Coordinate](5)
+ var index = 0
+
+ for (point <- cords) {
+ val coordinate2D = new GridCoordinates2D(point(0), point(1))
+ val result = coverage.getGridGeometry.gridToWorld(coordinate2D)
+ polyCoordinates({
+ index += 1; index - 1
+ }) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1))
+ }
+
+ polyCoordinates(index) = polyCoordinates(0)
+ val factory = new GeometryFactory
+ val polygon = JTS.transform(factory.createPolygon(polyCoordinates), targetCRS)
+
+ polygon
+
+ }
+}
+
+// get a particular band from a results of ST_GeomWithBandsFromGeoTiff
+case class RS_GetBand(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 3)
+ val bandInfo =inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ val targetBand = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+ val totalBands = inputExpressions(2).eval(inputRow).asInstanceOf[Int]
+ val result = gettargetband(bandInfo, targetBand, totalBands)
+ new GenericArrayData(result)
+ }
+
+ // fetch target band from the given array of bands
+ private def gettargetband(bandinfo: Array[Double], targetband:Int, totalbands:Int): Array[Double] = {
+ val sizeOfBand = bandinfo.length/totalbands
+ val lowerBound = (targetband - 1)*sizeOfBand
+ val upperBound = targetband*sizeOfBand
+ assert(bandinfo.slice(lowerBound,upperBound).length == sizeOfBand)
+ bandinfo.slice(lowerBound, upperBound)
+
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_Array(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length == 2)
+ val len =inputExpressions(0).eval(inputRow).asInstanceOf[Int]
+ val num = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+ val result = createarray(len, num)
+ new GenericArrayData(result)
+ }
+
+ // Generate an empty band for the given spectral band in ageotiff image
+ private def createarray(len:Int, num:Double):Array[Double] = {
+
+ val result = new Array[Double](len)
+ for(i<-0 until len) {
+ result(i) = num
+ }
+ result
+ }
+
+ override def dataType: DataType = ArrayType(DoubleType)
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_Base64(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ assert(inputExpressions.length>=5 && inputExpressions.length<=6)
+
+ val height = inputExpressions(0).eval(inputRow).asInstanceOf[Int]
+ val width = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+ val band1 = inputExpressions(2).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ val band2 = inputExpressions(3).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ val band3 = inputExpressions(4).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ var bufferedimage:BufferedImage = null
+ if(inputExpressions.length==5) {
+ bufferedimage = getBufferedimage(band1, band2, band3, null , height, width)
+ }
+ else {
+ var band4:Array[Double] = null
+ if(inputExpressions(5).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+ band4 = inputExpressions(5).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+ }
+ else {
+ band4 = inputExpressions(5).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+ }
+ bufferedimage = getBufferedimage(band1, band2, band3, band4, height, width)
+ }
+
+ val result = convertToBase64(bufferedimage)
+ UTF8String.fromString(result)
+ }
+
+ private def getBufferedimage(band1:Array[Double], band2:Array[Double], band3:Array[Double], band4:Array[Double], height:Int, width:Int): BufferedImage = {
+ val image = new BufferedImage(width, height, BufferedImage.TYPE_INT_ARGB)
+ var w = 0
+ var h = 0
+ band4 match {
+ case null => {
+ for (i <- 0 until (height * width)) {
+ if(i>0 && i%height==0) {
+ h+=1
+ }
+ w = i%width
+ image.setRGB(w, h, new Color(band1(i).toInt, band2(i).toInt, band3(i).toInt, 255).getRGB())
+ }
+ image
+ }
+ case _ => {
+ for (i <- 0 until (height * width)) {
+ if(i>0 && i%height==0) {
+ h+=1
+ }
+ w = i%width
+ image.setRGB(w, h, new Color(band1(i).toInt, band2(i).toInt, band3(i).toInt, band4(i).toInt).getRGB())
+
+ }
+ image
+ }
+
+ }
+ }
+
+ // Convert Buffered image to Base64 String
+ private def convertToBase64(image: BufferedImage): String = {
+
+ val os = new ByteArrayOutputStream()
+ ImageIO.write(image,"png", os)
+ Base64.getEncoder.encodeToString(os.toByteArray)
+ }
+
+
+ override def dataType: DataType = StringType
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_HTML(inputExpressions: Seq[Expression])
+ extends Expression with CodegenFallback with UserDataGeneratator {
+ override def nullable: Boolean = false
+
+ override def eval(inputRow: InternalRow): Any = {
+ // This is an expression which takes one input expressions
+ val encodedstring =inputExpressions(0).eval(inputRow).asInstanceOf[UTF8String].toString
+ // Add image width if needed
+ var imageWidth = "200"
+ if (inputExpressions.length == 2) imageWidth = inputExpressions(1).eval(inputRow).asInstanceOf[UTF8String].toString
+ val result = htmlstring(encodedstring, imageWidth)
+ UTF8String.fromString(result)
+ }
+
+ // create HTML string from Base64 string
+ private def htmlstring(encodestring: String, imageWidth: String): String = {
+ "<img src=\"" + createmainstring(encodestring) + "\" width=\"" + imageWidth + "\" />"
+ }
+
+ private def createmainstring(encodestring:String): String = {
+
+ val result = s"data:image/png;base64,$encodestring"
+ result
+ }
+ override def dataType: DataType = StringType
+
+ override def children: Seq[Expression] = inputExpressions
+}
+
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
new file mode 100644
index 0000000..cf9a314
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.spark.sql.sedona_sql.io
+
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+private[spark] class GeotiffFileFormat extends FileFormat with DataSourceRegister {
+
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = Some(GeotiffSchema.imageSchema)
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ throw new UnsupportedOperationException("Write is not supported for image data source")
+ }
+
+ override def shortName(): String = "geotiff"
+
+ override protected def buildReader(
+ sparkSession: SparkSession,
+ dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
+ filters: Seq[Filter],
+ options: Map[String, String],
+ hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+ assert(
+ requiredSchema.length <= 1,
+ "Image data source only produces a single data column named \"image\".")
+
+ val broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+ val imageSourceOptions = new ImageOptions(options)
+
+ (file: PartitionedFile) => {
+ val emptyUnsafeRow = new UnsafeRow(0)
+ if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
+ Iterator(emptyUnsafeRow)
+ } else {
+ val origin = file.filePath
+ val path = new Path(origin)
+ val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
+ val stream = fs.open(path)
+ val bytes = try {
+ ByteStreams.toByteArray(stream)
+ } finally {
+ Closeables.close(stream, true)
+ }
+
+ val resultOpt = GeotiffSchema.decode(origin, bytes)
+ val filteredResult = if (imageSourceOptions.dropInvalid) {
+ resultOpt.toIterator
+ } else {
+ Iterator(resultOpt.getOrElse(GeotiffSchema.invalidImageRow(origin)))
+ }
+
+ if (requiredSchema.isEmpty) {
+ filteredResult.map(_ => emptyUnsafeRow)
+ } else {
+ val converter = RowEncoder(requiredSchema).createSerializer() // SPARK3 anchor
+ filteredResult.map(row => converter(row)) // SPARK3 anchor
+// val converter = RowEncoder(requiredSchema) // SPARK2 anchor
+// filteredResult.map(row => converter.toRow(row)) // SPARK2 anchor
+ }
+ }
+ }
+ }
+}
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala
new file mode 100644
index 0000000..bafa0f2
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.io
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types._
+import org.geotools.coverage.grid.io.{AbstractGridFormat, GridCoverage2DReader, OverviewPolicy}
+import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D}
+import org.geotools.gce.geotiff.GeoTiffReader
+import org.geotools.geometry.jts.JTS
+import org.geotools.referencing.CRS
+import org.locationtech.jts.geom.{Coordinate, GeometryFactory}
+import org.opengis.coverage.grid.{GridCoordinates, GridEnvelope}
+import org.opengis.parameter.{GeneralParameterValue, ParameterValue}
+
+import java.io.ByteArrayInputStream
+
+object GeotiffSchema {
+ val undefinedImageType = "Undefined"
+
+ /**
+ * Schema for the image column: Row(String,Geometry, Int, Int, Int, Array[Double])
+ */
+ val columnSchema = StructType(
+ StructField("origin", StringType, true) ::
+ StructField("Geometry", GeometryUDT, true) ::
+ StructField("height", IntegerType, false) ::
+ StructField("width", IntegerType, false) ::
+ StructField("nBands", IntegerType, false) ::
+ StructField("data", ArrayType(DoubleType), false) :: Nil)
+
+ val imageFields: Array[String] = columnSchema.fieldNames
+
+ /**
+ * DataFrame with a single column of images named "image" (nullable)
+ */
+ val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
+
+ /**
+ * Gets the origin of the image
+ *
+ * @return The origin of the image
+ */
+ def getOrigin(row: Row): String = row.getString(0)
+
+ /**
+ * Gets the origin of the image
+ *
+ * @return The origin of the image
+ */
+ def getGeometry(row: Row): GeometryUDT = row.getAs[GeometryUDT](1)
+
+
+ /**
+ * Gets the height of the image
+ *
+ * @return The height of the image
+ */
+ def getHeight(row: Row): Int = row.getInt(2)
+
+ /**
+ * Gets the width of the image
+ *
+ * @return The width of the image
+ */
+ def getWidth(row: Row): Int = row.getInt(3)
+
+ /**
+ * Gets the number of channels in the image
+ *
+ * @return The number of bands in the image
+ */
+ def getNBands(row: Row): Int = row.getInt(4)
+
+
+ /**
+ * Gets the image data
+ *
+ * @return The image data
+ */
+ def getData(row: Row): Array[Double] = row.getAs[Array[Double]](5)
+
+ /**
+ * Default values for the invalid image
+ *
+ * @param origin Origin of the invalid image
+ * @return Row with the default values
+ */
+ private[io] def invalidImageRow(origin: String): Row =
+ Row(Row(origin, -1, -1, -1, Array.ofDim[Byte](0)))
+
+ /**
+ *
+ * Convert a GeoTiff image into a dataframe row
+ *
+ *
+ * @param origin Arbitrary string that identifies the image
+ * @param bytes Image bytes (for example, jpeg)
+ * @return DataFrame Row or None (if the decompression fails)
+ *
+ */
+
+ private[io] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
+
+ val policy: ParameterValue[OverviewPolicy] = AbstractGridFormat.OVERVIEW_POLICY.createValue
+ policy.setValue(OverviewPolicy.IGNORE)
+ val gridsize: ParameterValue[String] = AbstractGridFormat.SUGGESTED_TILE_SIZE.createValue
+ val useJaiRead: ParameterValue[Boolean] = AbstractGridFormat.USE_JAI_IMAGEREAD.createValue.asInstanceOf[ParameterValue[Boolean]]
+ useJaiRead.setValue(true)
+
+ // Read Geotiff image from Byte Array
+ val reader: GridCoverage2DReader = try {
+ new GeoTiffReader(new ByteArrayInputStream(bytes))
+ } catch {
+ // Catch runtime exception because `ImageIO` may throw unexpected `RuntimeException`.
+ // But do not catch the declared `IOException` (regarded as FileSystem failure)
+ case _: RuntimeException => null
+ }
+ var coverage: GridCoverage2D = null
+ if (reader == null) {
+ None
+ } else {
+ coverage = reader.read(Array[GeneralParameterValue](policy, gridsize, useJaiRead))
+ }
+
+ // Fetch geometry from given image
+ val source = coverage.getCoordinateReferenceSystem
+ val target = CRS.decode("EPSG:4326", true)
+ val targetCRS = CRS.findMathTransform(source, target)
+ val gridRange2D = coverage.getGridGeometry.getGridRange
+ val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getLow(1)))
+ val polyCoordinates = new Array[Coordinate](5)
+ var index = 0
+
+ for (point <- cords) {
+ val coordinate2D = new GridCoordinates2D(point(0), point(1))
+ val result = coverage.getGridGeometry.gridToWorld(coordinate2D)
+ polyCoordinates({
+ index += 1;
+ index - 1
+ }) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1))
+ }
+
+ polyCoordinates(index) = polyCoordinates(0)
+ val factory = new GeometryFactory
+ val polygon = JTS.transform(factory.createPolygon(polyCoordinates), targetCRS)
+
+ // Fetch band values from given image
+ val nBands: Int = coverage.getNumSampleDimensions
+ val dimensions: GridEnvelope = reader.getOriginalGridRange
+ val maxDimensions: GridCoordinates = dimensions.getHigh
+ val width: Int = maxDimensions.getCoordinateValue(0) + 1
+ val height: Int = maxDimensions.getCoordinateValue(1) + 1
+ val imageSize = height * width * nBands
+ assert(imageSize < 1e9, "image is too large")
+ val decoded = Array.ofDim[Double](imageSize)
+
+ for (i <- 0 until height) {
+ for (j <- 0 until width) {
+ val vals: Array[Double] = new Array[Double](nBands)
+ coverage.evaluate(new GridCoordinates2D(j, i), vals)
+ // bands of a pixel will be put in [b0...b1...b2...]
+ // Each "..." represent w * h pixels
+ for (bandId <- 0 until nBands) {
+ val offset = i * width + j + nBands * bandId
+ decoded(offset) = vals(bandId)
+ }
+ }
+ }
+ // the internal "Row" is needed, because the image is a single DataFrame column
+ Some(Row(Row(origin, polygon, height, width, nBands, decoded)))
+ }
+}
+
+
+
+
+
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala
new file mode 100644
index 0000000..54c5377
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.io
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.sql.SparkSession
+
+import scala.language.existentials
+import scala.util.Random
+
+object RecursiveFlag {
+
+ /** Sets a value of spark recursive flag
+ *
+ * @param value value to set
+ * @param spark existing spark session
+ * @return previous value of this flag
+ */
+ def setRecursiveFlag(value: Option[String], spark: SparkSession): Option[String] = {
+ val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ val old = Option(hadoopConf.get(flagName))
+
+ value match {
+ case Some(v) => hadoopConf.set(flagName, v)
+ case None => hadoopConf.unset(flagName)
+ }
+
+ old
+ }
+}
+
+
+/** Filter that allows loading a fraction of HDFS files. */
+class SamplePathFilter extends Configured with PathFilter {
+ val random = {
+ val rd = new Random()
+ rd.setSeed(0)
+ rd
+ }
+
+ // Ratio of files to be read from disk
+ var sampleRatio: Double = 1
+
+ override def setConf(conf: Configuration): Unit = {
+ if (conf != null) {
+ sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+ }
+ }
+
+ override def accept(path: Path): Boolean = {
+ // Note: checking fileSystem.isDirectory is very slow here, so we use basic rules instead
+ !SamplePathFilter.isFile(path) ||
+ random.nextDouble() < sampleRatio
+ }
+}
+
+object SamplePathFilter {
+ val ratioParam = "sampleRatio"
+
+ def isFile(path: Path): Boolean = FilenameUtils.getExtension(path.toString) != ""
+
+ /** Set/unset hdfs PathFilter
+ *
+ * @param value Filter class that is passed to HDFS
+ * @param sampleRatio Fraction of the files that the filter picks
+ * @param spark Existing Spark session
+ * @return
+ */
+ def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = None, spark: SparkSession)
+ : Option[Class[_]] = {
+ val flagName = FileInputFormat.PATHFILTER_CLASS
+ val hadoopConf = spark.sparkContext.hadoopConfiguration
+ val old = Option(hadoopConf.getClass(flagName, null))
+ if (sampleRatio.isDefined) {
+ hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio.get)
+ } else {
+ hadoopConf.unset(SamplePathFilter.ratioParam)
+ None
+ }
+
+ value match {
+ case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
+ case None => hadoopConf.unset(flagName)
+ }
+ old
+ }
+}
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageOptions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageOptions.scala
new file mode 100644
index 0000000..d736de1
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageOptions.scala
@@ -0,0 +1,12 @@
+package org.apache.spark.sql.sedona_sql.io
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+private[io] class ImageOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {
+ def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+ /**
+ * Whether to drop invalid images. If true, invalid images will be removed, otherwise
+ * invalid images will be returned with empty data and all other field filled with `-1`.
+ */
+ val dropInvalid = parameters.getOrElse("dropInvalid", "false").toBoolean
+}
\ No newline at end of file
diff --git a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 1c33fe9..5cf0c71 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.scalatest.{BeforeAndAfterAll, FunSpec}
trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
+ Logger.getRootLogger().setLevel(Level.WARN)
Logger.getLogger("org.apache").setLevel(Level.WARN)
Logger.getLogger("com").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
diff --git a/sql/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala b/sql/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
new file mode 100644
index 0000000..c23a1f7
--- /dev/null
+++ b/sql/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sedona.sql
+
+import org.locationtech.jts.geom.Geometry
+import org.scalatest.{BeforeAndAfter, GivenWhenThen}
+
+import scala.collection.mutable
+
+class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen {
+
+ var rasterdatalocation: String = resourceFolder + "raster/"
+
+ describe("Raster IO test") {
+ it("Should Pass geotiff loading") {
+ var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(rasterdatalocation)
+ df = df.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+ assert(df.first().getAs[Geometry](1).toText == "POLYGON ((-117.64141128097314 33.94356351407699, -117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284," +
+ " -117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))")
+ assert(df.first().getInt(2) == 517)
+ assert(df.first().getInt(3) == 512)
+ assert(df.first().getInt(5) == 1)
+ val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
+ val line1 = blackBand.slice(0, 512)
+ val line2 = blackBand.slice(512, 1024)
+ assert(line1(0) == 0.0) // The first value at line 1 is black
+ assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, value at 159 is black and at 160 is not black
+ }
+
+ it("should pass RS_GetBand") {
+ var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(resourceFolder + "raster/")
+ df = df.selectExpr(" image.data as data", "image.nBands as bands")
+ df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand")
+ assert(df.first().getAs[mutable.WrappedArray[Double]](0).length == 512 * 517)
+ }
+
+ it("should pass RS_Base64") {
+ var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(resourceFolder + "raster/")
+ df = df.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+ df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", "width","height")
+ df.createOrReplaceTempView("geotiff")
+ df = sparkSession.sql("Select RS_base64(height, width, targetBand, RS_Array(height*width, 0), RS_Array(height*width, 0)) as encodedstring from geotiff")
+// printf(df.first().getAs[String](0))
+ }
+
+ it("should pass RS_HTML") {
+ var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(resourceFolder + "raster/")
+ df = df.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+ df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", "width","height")
+ df.createOrReplaceTempView("geotiff")
+ df = sparkSession.sql("Select RS_base64(height, width, targetBand, RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring from geotiff")
+ df = df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" )
+ assert(df.first().getAs[String](0).contains("img"))
+// printf(df.first().getAs[String](0))
+ }
+ }
+}
+
+
+
+
+
diff --git a/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala b/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
new file mode 100644
index 0000000..23fea93
--- /dev/null
+++ b/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.scalatest.{BeforeAndAfter, GivenWhenThen}
+
+import scala.collection.mutable
+
+
+class rasteralgebraTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen{
+
+ import sparkSession.implicits._
+
+ describe("should pass all the arithmetic operations on bands") {
+ it("Passed RS_AddBands") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(400.0, 900.0, 1400.0))).toDF("sumOfBands")
+ inputDf = inputDf.selectExpr("RS_AddBands(Band1,Band2) as sumOfBands")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ it("Passed RS_SubtractBands") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(0.0, 100.0, 200.0))).toDF("differenceOfBands")
+ inputDf = inputDf.selectExpr("RS_SubtractBands(Band1,Band2) as differenceOfBands")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ it("Passed RS_MultiplyBands") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(40000.0, 200000.0, 480000.0))).toDF("MultiplicationOfBands")
+ inputDf = inputDf.selectExpr("RS_MultiplyBands(Band1,Band2) as multiplicationOfBands")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ it("Passed RS_DivideBands") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 200.0, 500.0)), ((Seq(0.4, 0.26, 0.27), Seq(0.3, 0.32, 0.43)))).toDF("Band1", "Band2")
+ val expectedList = List(List(1.0, 2.0, 1.2), List(1.33, 0.81, 0.63))
+ val inputList = inputDf.selectExpr("RS_DivideBands(Band1,Band2) as divisionOfBands").as[List[Double]].collect().toList
+ val resultList = inputList zip expectedList
+ for((actual, expected) <- resultList) {
+ assert(actual == expected)
+ }
+
+ }
+
+ it("Passed RS_MultiplyFactor") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0))).toDF("Band")
+ val expectedDF = Seq((Seq(600.0, 1200.0, 1800.0))).toDF("multiply")
+ inputDf = inputDf.selectExpr("RS_MultiplyFactor(Band, 3) as multiply")
+ expectedDF.show()
+ inputDf.show()
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ }
+
+ describe("Should pass basic statistical tests") {
+ it("Passed RS_Mode") {
+ val inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0)), (Seq(200.0, 400.0, 600.0, 700.0))).toDF("Band")
+ val expectedResult = List(List(200.0), List(200.0, 400.0, 600.0, 700.0))
+ val actualResult = inputDf.selectExpr("RS_Mode(Band) as mode").as[List[Double]].collect().toList
+ val resultList = actualResult zip expectedResult
+ for((actual, expected) <- resultList) {
+ assert(actual == expected)
+ }
+
+ }
+
+ it("Passed RS_Mean") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0)), (Seq(200.0, 400.0, 600.0, 700.0)), (Seq(0.43, 0.36, 0.73, 0.56)) ).toDF("Band")
+ val expectedList = List(350.0,475.0,0.52)
+ val actualList = inputDf.selectExpr("RS_Mean(Band) as mean").as[Double].collect().toList
+ val resultList = actualList zip expectedList
+ for((actual, expected) <- resultList) {
+ assert(actual == expected)
+ }
+ }
+
+ it("Passed RS_NormalizedDifference") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(0.0, 0.11, 0.14))).toDF("normalizedDifference")
+ inputDf = inputDf.selectExpr("RS_NormalizedDifference(Band1,Band2) as normalizedDifference")
+ expectedDF.show()
+ inputDf.show()
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_Count") {
+ var inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0, 600.0, 600.0, 800.0))).toDF("Band")
+ val expectedDF = Seq(3).toDF("Count")
+ inputDf = inputDf.selectExpr("RS_Count(Band, 600.0) as Count")
+ assert(inputDf.first().getAs[Int](0) == expectedDF.first().getAs[Int](0))
+ }
+ }
+
+ describe("Should pass operator tests") {
+ it("Passed RS_GreaterThan") {
+ var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+ val expectedDF = Seq((Seq(1.0,1.0,0.0,0.0,1.0,1.0,0.0)), (Seq(0.0,0.0,0.0,0.0,1.0,0.0))).toDF("GreaterThan")
+ inputDf = inputDf.selectExpr("RS_GreaterThan(Band, 0.2) as GreaterThan")
+ inputDf.show()
+ expectedDF.show()
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_GreaterThanEqual") {
+ var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+ val expectedDF = Seq((Seq(1.0,1.0,0.0,1.0,1.0,1.0,0.0)), (Seq(0.0,0.0,0.0,0.0,1.0,0.0))).toDF("GreaterThanEqual")
+ inputDf = inputDf.selectExpr("RS_GreaterThanEqual(Band, 0.2) as GreaterThanEqual")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_LessThan") {
+ var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+ val expectedDF = Seq((Seq(0.0,0.0,1.0,0.0,0.0,0.0,1.0)), (Seq(1.0,1.0,1.0,0.0,1.0))).toDF("LessThan")
+ inputDf = inputDf.selectExpr("RS_LessThan(Band, 0.2) as LessThan")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_LessThanEqual") {
+ var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+ val expectedDF = Seq((Seq(0.0,0.0,1.0,1.0,0.0,0.0,1.0)), (Seq(1.0,1.0,1.0,0.0,1.0))).toDF("LessThanEqual")
+ inputDf = inputDf.selectExpr("RS_LessThanEqual(Band, 0.2) as LessthanEqual")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_Modulo") {
+ var inputDf = Seq((Seq(100.0, 260.0, 189.0, 106.0, 230.0, 169.0, 196.0)), (Seq(230.0, 345.0, 136.0, 106.0, 134.0, 105.0))).toDF("Band")
+ val expectedDF = Seq((Seq(10.0, 80.0, 9.0, 16.0, 50.0, 79.0, 16.0)), (Seq(50.0, 75.0, 46.0, 16.0, 44.0, 15.0))).toDF("Modulo")
+ inputDf = inputDf.selectExpr("RS_Modulo(Band, 90.0) as Modulo")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ it("Passed RS_BitwiseAND") {
+ var inputDf = Seq((Seq(10.0, 20.0, 30.0), Seq(10.0, 20.0, 30.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(10.0, 20.0, 30.0))).toDF("AND")
+ inputDf = inputDf.selectExpr("RS_BitwiseAND(Band1, Band2) as AND")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_BitwiseOR") {
+ var inputDf = Seq((Seq(10.0, 20.0, 30.0), Seq(40.0, 22.0, 62.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(42.0, 22.0, 62.0))).toDF("OR")
+ inputDf = inputDf.selectExpr("RS_BitwiseOR(Band1, Band2) as OR")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_SquareRoot") {
+ var inputDf = Seq((Seq(8.0, 16.0, 24.0))).toDF("Band")
+ val expectedDF = Seq((Seq(2.83, 4.0, 4.90))).toDF("SquareRoot")
+ inputDf = inputDf.selectExpr("RS_SquareRoot(Band) as SquareRoot")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ it("Passed RS_LogicalDifference") {
+ var inputDf = Seq((Seq(10.0, 20.0, 30.0), Seq(40.0, 20.0, 50.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(10.0, 0.0, 30.0))).toDF("LogicalDifference")
+ inputDf = inputDf.selectExpr("RS_LogicalDifference(Band1, Band2) as LogicalDifference")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_LogicalOver") {
+ var inputDf = Seq((Seq(0.0, 0.0, 30.0), Seq(40.0, 20.0, 50.0))).toDF("Band1", "Band2")
+ val expectedDF = Seq((Seq(40.0, 20.0, 30.0))).toDF("LogicalOR")
+ inputDf = inputDf.selectExpr("RS_LogicalOver(Band1, Band2) as LogicalOR")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+ }
+
+ it("Passed RS_FetchRegion") {
+ var inputDf = Seq((Seq(100.0, 260.0, 189.0, 106.0, 230.0, 169.0, 196.0, 200.0, 460.0))).toDF("Band")
+ val expectedDF = Seq(Seq(100.0, 260.0, 189.0, 106.0, 230.0, 169.0)).toDF("Region")
+ inputDf = inputDf.selectExpr("RS_FetchRegion(Band,Array(0, 0, 1, 2),Array(3, 3)) as Region")
+ assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+ }
+
+ it("should pass RS_Normalize") {
+ var df = Seq((Seq(800.0, 900.0, 0.0, 255.0)), (Seq(100.0, 200.0, 700.0, 900.0))).toDF("Band")
+ df = df.selectExpr("RS_Normalize(Band) as normalizedBand")
+ assert(df.first().getAs[mutable.WrappedArray[Double]](0)(1) == 255)
+ }
+ }
+}