You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2021/06/17 09:36:28 UTC

[incubator-sedona] branch master updated: [SEDONA-30] Add raster data support in Sedona SQL (#523)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd688f  [SEDONA-30] Add raster data support in Sedona SQL (#523)
8fd688f is described below

commit 8fd688f4c26374bf2f4811d1dc4c333d9acd3b4d
Author: shantanuaggarwal2695 <sh...@gmail.com>
AuthorDate: Thu Jun 17 02:36:18 2021 -0700

    [SEDONA-30] Add raster data support in Sedona SQL (#523)
    
    Co-authored-by: Jia Yu <ji...@gmail.com>
    Co-authored-by: Jia Yu <ji...@apache.org>
---
 .gitignore                                         |   2 +-
 binder/ApacheSedonaRaster_1.ipynb                  | 871 +++++++++++++++++++++
 binder/data/raster/T21HUB_4704_4736_8224_8256.tif  | Bin 0 -> 6619 bytes
 .../data/raster/vya_T21HUB_992_1024_4352_4384.tif  | Bin 0 -> 7689 bytes
 core/src/test/resources/raster/test1.tiff          | Bin 0 -> 174803 bytes
 core/src/test/resources/raster/test2.tiff          | Bin 0 -> 174803 bytes
 docs/api/sql/Constructor.md                        |   2 +-
 docs/api/sql/Raster-loader.md                      | 144 ++++
 docs/api/sql/Raster-operators.md                   | 309 ++++++++
 docs/tutorial/raster.md                            |  19 +
 mkdocs.yml                                         |  15 +-
 pom.xml                                            |  13 +
 python-adapter/.gitignore                          |   3 +-
 python-adapter/pom.xml                             |  13 +
 spark-version-converter.py                         |   3 +-
 ...org.apache.spark.sql.sources.DataSourceRegister |   1 +
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |  31 +-
 .../sql/sedona_sql/expressions/Functions.scala     |   3 +-
 .../sedona_sql/expressions/raster/Functions.scala  | 865 ++++++++++++++++++++
 .../sql/sedona_sql/expressions/raster/IO.scala     | 256 ++++++
 .../sql/sedona_sql/io/GeotiffFileFormat.scala      | 104 +++
 .../spark/sql/sedona_sql/io/GeotiffSchema.scala    | 195 +++++
 .../spark/sql/sedona_sql/io/HadoopUtils.scala      | 107 +++
 .../spark/sql/sedona_sql/io/ImageOptions.scala     |  12 +
 .../org/apache/sedona/sql/TestBaseScala.scala      |   1 +
 .../scala/org/apache/sedona/sql/rasterIOTest.scala |  79 ++
 .../org/apache/sedona/sql/rasteralgebraTest.scala  | 208 +++++
 27 files changed, 3244 insertions(+), 12 deletions(-)

diff --git a/.gitignore b/.gitignore
index 4ced234..2eee0df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -12,4 +12,4 @@
 /site/
 /.bloop/
 /.metals/
-/.vscode/
\ No newline at end of file
+/.vscode/
diff --git a/binder/ApacheSedonaRaster_1.ipynb b/binder/ApacheSedonaRaster_1.ipynb
new file mode 100644
index 0000000..1220a51
--- /dev/null
+++ b/binder/ApacheSedonaRaster_1.ipynb
@@ -0,0 +1,871 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 1,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import findspark\n",
+    "findspark.init()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 2,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "from IPython.display import display, HTML\n",
+    "from pyspark.sql import SparkSession\n",
+    "from pyspark import StorageLevel\n",
+    "import pandas as pd\n",
+    "from pyspark.sql.types import StructType, StructField,StringType, LongType, IntegerType, DoubleType, ArrayType\n",
+    "from pyspark.sql.functions import regexp_replace\n",
+    "from sedona.register import SedonaRegistrator\n",
+    "from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
+    "from pyspark.sql.functions import col, split, expr\n",
+    "from pyspark.sql.functions import udf, lit\n",
+    "from sedona.utils import SedonaKryoRegistrator, KryoSerializer\n",
+    "from pyspark.sql.functions import col, split, expr\n",
+    "from pyspark.sql.functions import udf, lit\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Create Spark Session for application"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 3,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "spark = SparkSession.\\\n",
+    "    builder.\\\n",
+    "    master(\"local[*]\").\\\n",
+    "    appName(\"Demo-app\").\\\n",
+    "    config(\"spark.serializer\", KryoSerializer.getName).\\\n",
+    "    config(\"spark.kryo.registrator\", SedonaKryoRegistrator.getName) .\\\n",
+    "    config(\"spark.executor.cores\", 3) .\\\n",
+    "    config(\"spark.driver.memory\", \"4G\") .\\\n",
+    "    config(\"spark.kryoserializer.buffer.max.value\", \"4096\") .\\\n",
+    "    config(\"spark.sql.crossJoin.enabled\", \"true\") .\\\n",
+    "    getOrCreate()\n",
+    "\n",
+    "SedonaRegistrator.registerAll(spark)\n",
+    "sc = spark.sparkContext\n"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Geotiff Loader \n",
+    "\n",
+    "1. Loader takes as input a path to directory which contains geotiff files or a parth to particular geotiff file\n",
+    "2. Loader will read geotiff image in a struct named image which contains multiple fields as shown in the schema below which can be extracted using spark SQL"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 4,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Path to directory of geotiff images \n",
+    "DATA_DIR = \"./data/raster/\""
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 5,
+   "metadata": {
+    "scrolled": true
+   },
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "root\n",
+      " |-- image: struct (nullable = true)\n",
+      " |    |-- origin: string (nullable = true)\n",
+      " |    |-- Geometry: geometry (nullable = true)\n",
+      " |    |-- height: integer (nullable = true)\n",
+      " |    |-- width: integer (nullable = true)\n",
+      " |    |-- nBands: integer (nullable = true)\n",
+      " |    |-- data: array (nullable = true)\n",
+      " |    |    |-- element: double (containsNull = true)\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "df = spark.read.format(\"geotiff\").option(\"dropInvalid\",True).load(DATA_DIR)\n",
+    "df.printSchema()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 6,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+      "|              origin|                Geom|height|width|                data|bands|\n",
+      "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+      "|file:///home/hp/D...|POLYGON ((-58.702...|    32|   32|[1081.0, 1068.0, ...|    4|\n",
+      "|file:///home/hp/D...|POLYGON ((-58.286...|    32|   32|[1151.0, 1141.0, ...|    4|\n",
+      "+--------------------+--------------------+------+-----+--------------------+-----+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "df = df.selectExpr(\"image.origin as origin\",\"image.Geometry as Geom\", \"image.height as height\", \"image.width as width\", \"image.data as data\", \"image.nBands as bands\")\n",
+    "df.show(5)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Extract a particular band from geotiff dataframe using RS_GetBand()\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+      "|                Geom|               Band1|               Band2|               Band3|               Band4|\n",
+      "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+      "|POLYGON ((-58.702...|[1081.0, 1068.0, ...|[865.0, 1084.0, 1...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|\n",
+      "|POLYGON ((-58.286...|[1151.0, 1141.0, ...|[1197.0, 1163.0, ...|[0.0, 0.0, 0.0, 0...|[0.0, 0.0, 0.0, 0...|\n",
+      "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_GetBand() will fetch a particular band from given data array which is the concatination of all the bands'''\n",
+    "\n",
+    "df = df.selectExpr(\"Geom\",\"RS_GetBand(data, 1,bands) as Band1\",\"RS_GetBand(data, 2,bands) as Band2\",\"RS_GetBand(data, 3,bands) as Band3\", \"RS_GetBand(data, 4,bands) as Band4\")\n",
+    "df.createOrReplaceTempView(\"allbands\")\n",
+    "df.show(5)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Map Algebra operations on band values "
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 8,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|            normDiff|\n",
+      "+--------------------+\n",
+      "|[-0.11, 0.01, 0.0...|\n",
+      "|[0.02, 0.01, -0.0...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_NormalizedDifference can be used to calculate NDVI for a particular geotiff image since it uses same computational formula as ndvi'''\n",
+    "\n",
+    "NomalizedDifference = df.selectExpr(\"RS_NormalizedDifference(Band1, Band2) as normDiff\")\n",
+    "NomalizedDifference.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+-------+\n",
+      "|   mean|\n",
+      "+-------+\n",
+      "|1153.85|\n",
+      "|1293.77|\n",
+      "+-------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_Mean() can used to calculate mean of piel values in a particular spatial band '''\n",
+    "meanDF = df.selectExpr(\"RS_Mean(Band1) as mean\")\n",
+    "meanDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+----------------+\n",
+      "|            mode|\n",
+      "+----------------+\n",
+      "| [1011.0, 927.0]|\n",
+      "|[1176.0, 1230.0]|\n",
+      "+----------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "\"\"\" RS_Mode() is used to calculate mode in an array of pixels and returns a array of double with size 1 in case of unique mode\"\"\"\n",
+    "modeDF = df.selectExpr(\"RS_Mode(Band1) as mode\")\n",
+    "modeDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|         greaterthan|\n",
+      "+--------------------+\n",
+      "|[1.0, 1.0, 1.0, 0...|\n",
+      "|[1.0, 1.0, 1.0, 1...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_GreaterThan() is used to mask all the values with 1 which are greater than a particular threshold'''\n",
+    "greaterthanDF = spark.sql(\"Select RS_GreaterThan(Band1,1000.0) as greaterthan from allbands\")\n",
+    "greaterthanDF.show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 15,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|    greaterthanEqual|\n",
+      "+--------------------+\n",
+      "|[1.0, 1.0, 1.0, 1...|\n",
+      "|[1.0, 1.0, 1.0, 1...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_GreaterThanEqual() is used to mask all the values with 1 which are greater than a particular threshold'''\n",
+    "\n",
+    "greaterthanEqualDF = spark.sql(\"Select RS_GreaterThanEqual(Band1,360.0) as greaterthanEqual from allbands\")\n",
+    "greaterthanEqualDF.show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 16,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|            lessthan|\n",
+      "+--------------------+\n",
+      "|[0.0, 0.0, 0.0, 1...|\n",
+      "|[0.0, 0.0, 0.0, 0...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_LessThan() is used to mask all the values with 1 which are less than a particular threshold'''\n",
+    "lessthanDF = spark.sql(\"Select RS_LessThan(Band1,1000.0) as lessthan from allbands\")\n",
+    "lessthanDF.show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 17,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|       lessthanequal|\n",
+      "+--------------------+\n",
+      "|[1.0, 1.0, 1.0, 1...|\n",
+      "|[1.0, 1.0, 1.0, 1...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_LessThanEqual() is used to mask all the values with 1 which are less than equal to a particular threshold'''\n",
+    "lessthanEqualDF = spark.sql(\"Select RS_LessThanEqual(Band1,2890.0) as lessthanequal from allbands\")\n",
+    "lessthanEqualDF.show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 18,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|           sumOfBand|\n",
+      "+--------------------+\n",
+      "|[1946.0, 2152.0, ...|\n",
+      "|[2348.0, 2304.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_AddBands() can add two spatial bands together'''\n",
+    "sumDF = df.selectExpr(\"RS_AddBands(Band1, Band2) as sumOfBand\")\n",
+    "sumDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 19,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|          diffOfBand|\n",
+      "+--------------------+\n",
+      "|[-216.0, 16.0, 11...|\n",
+      "|[46.0, 22.0, -96....|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_SubtractBands() can subtract two spatial bands together'''\n",
+    "subtractDF = df.selectExpr(\"RS_SubtractBands(Band1, Band2) as diffOfBand\")\n",
+    "subtractDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 20,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|       productOfBand|\n",
+      "+--------------------+\n",
+      "|[935065.0, 115771...|\n",
+      "|[1377747.0, 13269...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_MultiplyBands() can multiple two bands together'''\n",
+    "multiplyDF = df.selectExpr(\"RS_MultiplyBands(Band1, Band2) as productOfBand\")\n",
+    "multiplyDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 21,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|      divisionOfBand|\n",
+      "+--------------------+\n",
+      "|[1.25, 0.99, 0.9,...|\n",
+      "|[0.96, 0.98, 1.08...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_DivideBands() can divide two bands together'''\n",
+    "divideDF = df.selectExpr(\"RS_DivideBands(Band1, Band2) as divisionOfBand\")\n",
+    "divideDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 22,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|              target|\n",
+      "+--------------------+\n",
+      "|[1730.0, 2168.0, ...|\n",
+      "|[2394.0, 2326.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_MultiplyFactor() will multiply a factor to a spatial band'''\n",
+    "mulfacDF = df.selectExpr(\"RS_MultiplyFactor(Band2, 2) as target\")\n",
+    "mulfacDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 23,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|                 AND|\n",
+      "+--------------------+\n",
+      "|[33.0, 1068.0, 10...|\n",
+      "|[1069.0, 1025.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_BitwiseAND() will return AND between two values of Bands'''\n",
+    "bitwiseAND = df.selectExpr(\"RS_BitwiseAND(Band1, Band2) as AND\")\n",
+    "bitwiseAND.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 24,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|                  OR|\n",
+      "+--------------------+\n",
+      "|[1913.0, 1084.0, ...|\n",
+      "|[1279.0, 1279.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_BitwiseOR() will return OR between two values of Bands'''\n",
+    "bitwiseOR = df.selectExpr(\"RS_BitwiseOR(Band1, Band2) as OR\")\n",
+    "bitwiseOR.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 25,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+-----+\n",
+      "|count|\n",
+      "+-----+\n",
+      "|  753|\n",
+      "| 1017|\n",
+      "+-----+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_Count() will calculate the total number of occurence of a target value'''\n",
+    "countDF = df.selectExpr(\"RS_Count(RS_GreaterThan(Band1,1000.0), 1.0) as count\")\n",
+    "countDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 26,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|              modulo|\n",
+      "+--------------------+\n",
+      "|[10.0, 18.0, 18.0...|\n",
+      "|[17.0, 7.0, 2.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_Modulo() will calculate the modulus of band value with respect to a given number'''\n",
+    "moduloDF = df.selectExpr(\"RS_Modulo(Band1, 21.0) as modulo \")\n",
+    "moduloDF.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 27,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|                root|\n",
+      "+--------------------+\n",
+      "|[32.88, 32.68, 32...|\n",
+      "|[33.93, 33.78, 35...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_SquareRoot() will calculate calculate square root of all the band values upto two decimal places'''\n",
+    "rootDF = df.selectExpr(\"RS_SquareRoot(Band1) as root\")\n",
+    "rootDF.show(5)\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 28,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|      loggDifference|\n",
+      "+--------------------+\n",
+      "|[1081.0, 1068.0, ...|\n",
+      "|[1151.0, 1141.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_LogicalDifference() will return value from band1 if value at that particular location is not equal tp band1 else it will return 0'''\n",
+    "logDiff = df.selectExpr(\"RS_LogicalDifference(Band1, Band2) as loggDifference\")\n",
+    "logDiff.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 29,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+\n",
+      "|         logicalOver|\n",
+      "+--------------------+\n",
+      "|[865.0, 1084.0, 1...|\n",
+      "|[1197.0, 1163.0, ...|\n",
+      "+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' RS_LogicalOver() will iterate over two bands and return value of first band if it is not equal to 0 else it will return value from later band'''\n",
+    "logOver = df.selectExpr(\"RS_LogicalOver(Band3, Band2) as logicalOver\")\n",
+    "logOver.show(5)"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# Visualising Geotiff Images\n",
+    "\n",
+    "1. Normalize the bands in range [0-255] if values are greater than 255\n",
+    "2. Process image using RS_Base64() which converts in into a base64 string\n",
+    "3. Embedd results of RS_Base64() in RS_HTML() to embedd into IPython notebook\n",
+    "4. Process results of RS_HTML() as below:"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 5,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+      "|                Geom|             RedBand|            BlueBand|           GreenBand|        CombinedBand|\n",
+      "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+      "|POLYGON ((-58.702...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|\n",
+      "|POLYGON ((-58.286...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|<img src=\"data:im...|\n",
+      "+--------------------+--------------------+--------------------+--------------------+--------------------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' Plotting images as a dataframe using geotiff Dataframe.'''\n",
+    "\n",
+    "df = spark.read.format(\"geotiff\").option(\"dropInvalid\",True).load(DATA_DIR)\n",
+    "df = df.selectExpr(\"image.origin as origin\",\"image.Geometry as Geom\", \"image.height as height\", \"image.width as width\", \"image.data as data\", \"image.nBands as bands\")\n",
+    "\n",
+    "df = df.selectExpr(\"RS_GetBand(data,1,bands) as targetband\", \"height\", \"width\", \"bands\", \"Geom\")\n",
+    "df_base64 = df.selectExpr(\"Geom\", \"RS_Base64(height,width,RS_Normalize(targetBand), RS_Array(height*width,0), RS_Array(height*width, 0)) as red\",\"RS_Base64(height,width,RS_Array(height*width, 0), RS_Normalize(targetBand), RS_Array(height*width, 0)) as green\", \"RS_Base64(height,width,RS_Array(height*width, 0),  RS_Array(height*width, 0), RS_Normalize(targetBand)) as blue\",\"RS_Base64(height,width,RS_Normalize(targetBand), RS_Normalize(targetBand),RS_Normalize(targetBand)) as  [...]
+    "df_HTML = df_base64.selectExpr(\"Geom\",\"RS_HTML(red) as RedBand\",\"RS_HTML(blue) as BlueBand\",\"RS_HTML(green) as GreenBand\", \"RS_HTML(RGB) as CombinedBand\")\n",
+    "df_HTML.show(5)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 6,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<table border=\"1\" class=\"dataframe\">\n",
+       "  <thead>\n",
+       "    <tr style=\"text-align: right;\">\n",
+       "      <th></th>\n",
+       "      <th>Geom</th>\n",
+       "      <th>RedBand</th>\n",
+       "      <th>BlueBand</th>\n",
+       "      <th>GreenBand</th>\n",
+       "      <th>CombinedBand</th>\n",
+       "    </tr>\n",
+       "  </thead>\n",
+       "  <tbody>\n",
+       "    <tr>\n",
+       "      <th>0</th>\n",
+       "      <td>POLYGON ((-58.70271939504448 -34.41877544555479, -58.70277605822864 -34.42156988068061, -58.6994039180242 -34.42161679331493, -58.69934736692278 -34.4188223533111, -58.70271939504448 -34.41877544555479))</td>\n",
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFGElEQVR42l2X2W4jRRSG+zVhEu9b2+6O492xE8eJE8eZZLIxZNaISZgJIAHSSGwSFyziDoRAvABPgMQVQkhDneg70q+5+F3d5aqz/Gep6qgVRW/WAhoBpYA6Y5XnWkCRNb2AecCDgIuA44A44LOAPwM+DeiwZxU5hnxAhTGPbJsvB0Td8DNCuG3eFMWTAPs/hwH7rH0d8DTgVcAQPAm4ClgieAXDDRmQR1aeeTM+Mk8GojTBkHWQsNCepwGLgK2AMc87KP084N+A31ifBRnYyPGeYy7rDMyg3wTdBuyhbChGbDCOWechM3YOAt4L+CHgb8Y6jC6g2z1eQbmPZkw0QKCN3yDQvD5BiC38ACVbGLZLOM4ZzYlDDDWvUgy9YH0snq9iQAl5k [...]
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFHklEQVR42l2X2W4jRRSG/ZowifetbbfttPexY8exsziTTBYzBGaJSMJMAIlBGolN4oJF3IEQiBfgCZC4QgjJVDXf8fyai9/VXa46y3+Wqk4kEs1VItFwqDrkHSqMJZ7LDjnWdBzmDg8dlg4nDoHDS4c/HD51aLFnEzkeGYciYwbZfr7gkGi7nwHC/eZtUTxa/f9/GgP2WfvK4YnDC4c+eOxw5bBA8AaGeyRBBlkZ5r3xsSc9URpiyBYIWeifJw4HDmOHIc+7KP3M4R+HX1mfAknYSPOeZi5lDEyh3wu6c9hDWV+MuM84ZJ2FzLNz6PCOw/cOfzFWYPQAus3jDZTb6I2JvW/AwtcI9F6fIsQv/AAlYwybEY4LRu/EEYZ6r+oYumR9IJ5vY [...]
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFGElEQVR42l2X2W4jRRSG/ZpA4i3euu1uO+09duw4ceI4k0w2hswaMQlDAIlBGolN4oJF3IEQiBfgCZC4QghpqGq+Y/2ai9/VXa46y3+Wqs5kWpnXmaZD3aHkEDJWea45bLCm6zB3uO9w6XDiEDi8dPjD4VOHNnvWkeNRcKgwFpDt58sOmY77GSLcb94WxWMH/38eAw5Y+8rhicOHDgPw2OHaYYngNQz3yIICsgrMe+NTT/qiNMKQTRCx0D9PHRYOE4cRz7so/czhH4dfWZ8DWdjI855nLmcMzKDfC7pz2EfZQIzYYhyxzkLm2Tl0eNfhe4e/GEMYXUC3ebyGchu9Man3TVj4GoHe61OE+IXvo2SCYXuE44LRO3GEod6rGEMvWR+I5+sYU [...]
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAGOElEQVR42k3XWUtXURQF8Ps1s3kwLRvNBm3O5kEtK0srmq3AgqAJemigtyIKv0CfIPBJJDjx27BE4Xj/99xz915r7eGc2+3Zs6ft3r27DQwMtC1btrTt27fXta+vr3739/e3zZs315r9+/e306dPt7GxsTY5OdmuXLnStm3b1l6+fNn+/PnT5ubm2r59++qdtWvXlh1j48aNbevWrXU12Dbf29vbuqGhoTY8PFzGvXz06NFlx0eOHGmeb9iwoQCcPXu21r5+/brNzMy0p0+ftkOHDtWYnp5u9+7daxcuXCjDa9asKeDGunXranDOlqt54DtMDh48uOx0586dBWTv3r013Fvo9/Hjx9u5c+fasWPH2uHDh+v3qVOnyumrV6/a0tJS+/XrV [...]
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>1</th>\n",
+       "      <td>POLYGON ((-58.28663657626114 -34.75858090620287, -58.28667994174946 -34.76137571668496, -58.28329340123002 -34.76141146033393, -58.28325014980316 -34.75861664615162, -58.28663657626114 -34.75858090620287))</td>\n",
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAE+UlEQVR42l2XS4/bVBTH/R0p08nDieOx4zhO7LxfM5lH0xnoY9qhtKgMagEJVd0hIbaw4FuwR2zY8QXKvdHvoL+yOLF977nn/M/7JoiD4FPHUelo4Shx9NBR31HlqO3I8zQcHTtqOooctXgv2D9iP2Yt4sxDqIGsnaO/HL1ylDsKUgCkgMgR3GYtFiFeQR3lkfCMHPXg9We7Yog/EwIgRMczzpx4AP5n4miA1TnUEvKHP3P0AEsNkFc0dnTr6AVgauyF4oUO3976P7De8w49gDkAEhDFAEhBn/B+JACOAeQ9cO7oFMtCeI4B0YLfQNw7+hUA3uAbDUEf97UAMCGWU/bash/KM5LnhLMNqH4QzgGhWiDXey2owdjHpRFCvGc2vO842IGar [...]
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAE/ElEQVR42l2XyXLcVBSG+x0JjntQS62WWq0epJ4nu2M7HZuExCEJQwVTCVBFUeyootjCgrdgT7FhxwuYe5XvmL+8OC3p3jP8Z7y3a7VafFurdRwVjlaOEkcPHQ0clY7C2w88TUfHjlqOIkdt3ofsH7EfsxYh8xBqouvg6C9HbxzljmopAFJA5CgOWYtFiTfQwHgkPBNHfXi9bE8c8TIBAAJsPEem6wH4n5mjEV7nUFvIC3/k6AGeGiBvaOro2tFLwNTZCyQKHb6993/gvecdewBLACQgigGQgj7h/UgAHAPIR+CRoxM8C+A5BkQbfgNx4+hXAHiHrzQFA8LXBsCMXM7ZC2U/kGckzxmyTahxL50jUrVCr49aFbIGRnoo8kp8ZHa8HxDsQ [...]
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAE+ElEQVR42l2XyW7bVhSG9Y5NHWugSFGkKGogNU+2YjuK3aSJ3SQdkLpomgBFkF2Bott20bfovuimu75Aei/xHeOHF0ck7z3Df8Z7VavFtU+1jqPC0cpR4uiho4Gj0lHoyPM0HR07ajmKHLV5H7J/xH7MWoTMQ6iJroOjvx29dpQ7qqUASAGRozhkLRYl3kAD45HwTBz14fWyPXHEywQACLDxHJmuB+B/Zo5GeJ1DbSEv/JmjB3hqgLyhqaNrRzeAqbMXSBQ6fHvv/8R7zzv2AJYASEAUAyAFfcL7kQA4BpCPwCNHJ3gWwHMMiDb8BuLW0W8A8A5faQoGhK8NgBm5nLMXyn4gz0ieM2SbUONeOkekaoVeH7UqZA2M9FDklfjI7Hg/INiBW [...]
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAF7klEQVR42k3XyU5WSxQF4POOKCooSmODqKggNoCCiA2tghqUAJIY44zEMNWBb8HcOGHmC9TNt5PFhaSoU1W7Wbut+ruhoaE2ODjY7ty506amptrIyEg7d+5cu3nzZhsfH2+XL19uaPr7+9v58+fbxYsX25UrV9rAwEB9j42N1Xlvb2+d+7aHBg9Zhm+yFhYW2p8/f9rGxkYbHR1t3dWrVwuAGQibBCO2R2CEUNDX11fKKQjN3bt3240bN4oW77Vr104MwXPp0qUCYKZjeXm5eIaHh1vn3/3799utW7fKagAMSjIw9/T0tDNnzpSlAUTRvXv32urqaltbWyswFy5cqDPK4gUGWrP+9+/fZT3a27dvt+7BgwcFAGJgWAEAAujt+6Y4AOwDx [...]
+       "    </tr>\n",
+       "  </tbody>\n",
+       "</table>"
+      ],
+      "text/plain": [
+       "<IPython.core.display.HTML object>"
+      ]
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    }
+   ],
+   "source": [
+    "display(HTML(df_HTML.limit(2).toPandas().to_html(escape=False)))"
+   ]
+  },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "# User can also create some UDF manually to manipulate Geotiff dataframes"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 7,
+   "metadata": {},
+   "outputs": [
+    {
+     "name": "stdout",
+     "output_type": "stream",
+     "text": [
+      "+------+\n",
+      "|   sum|\n",
+      "+------+\n",
+      "| 753.0|\n",
+      "|1017.0|\n",
+      "+------+\n",
+      "\n"
+     ]
+    }
+   ],
+   "source": [
+    "''' Sample UDF calculates sum of all the values in a band which are greater than 1000.0 '''\n",
+    "\n",
+    "def SumOfValues(band):\n",
+    "    total = 0.0\n",
+    "    for num in band:\n",
+    "        if num>1000.0:\n",
+    "            total+=1\n",
+    "    return total\n",
+    "    \n",
+    "calculateSum = udf(SumOfValues, DoubleType())\n",
+    "spark.udf.register(\"RS_Sum\", calculateSum)\n",
+    "\n",
+    "sumDF = df.selectExpr(\"RS_Sum(targetband) as sum\")\n",
+    "sumDF.show()"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<table border=\"1\" class=\"dataframe\">\n",
+       "  <thead>\n",
+       "    <tr style=\"text-align: right;\">\n",
+       "      <th></th>\n",
+       "      <th>Geom</th>\n",
+       "      <th>selectedregion</th>\n",
+       "    </tr>\n",
+       "  </thead>\n",
+       "  <tbody>\n",
+       "    <tr>\n",
+       "      <th>0</th>\n",
+       "      <td>POLYGON ((-58.70271939504448 -34.41877544555479, -58.70277605822864 -34.42156988068061, -58.6994039180242 -34.42161679331493, -58.69934736692278 -34.4188223533111, -58.70271939504448 -34.41877544555479))</td>\n",
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFlklEQVR42l2Xa28bRRSG529CXV/XG6/t9S1rx/c4iZPGaS6N3RBK2oiGSwCJVqpEAYkPBcQ3EALxB/gFSHxCCGk5M/OMM+qH1zO73p1zznvec2ZWtZRSTUFNEAiqjBvMK4ISz3QFc6XSU8FKcCKIBC8Efwq+FGwK5J30viAABUEo7xbABjbKApUIBiwuL6uJZ3jM/3kcOLDPpq8EV4JPBX3wgeBasBDIwmlGUAJZC2M8z6iDirQDEona8ozGONIGMQ/q+bbggSw2FYwEer6L0ZeCfwW/wUoOZGFDG87hQJa5YWAH+mUhdSvYx1jfc2LIOLLPpcJGKu+kwk56KHgs+EHwN6MEk3ZxcIMU6IgzGHejdsZE32T8VnBI1GekRT/4oUCMqKl1L [...]
+       "    </tr>\n",
+       "    <tr>\n",
+       "      <th>1</th>\n",
+       "      <td>POLYGON ((-58.28663657626114 -34.75858090620287, -58.28667994174946 -34.76137571668496, -58.28329340123002 -34.76141146033393, -58.28325014980316 -34.75861664615162, -58.28663657626114 -34.75858090620287))</td>\n",
+       "      <td><img src=\"data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAgCAYAAABzenr0AAAFfklEQVR42k2XSXPbRhCF8R/tSOJOQiRBECTBTaRILdZiybJjKbacpGynvKQqlcotVSlfk0P+Re6pXHLLH0C6e74R5vAEgFj69evXPaMojqKoI5gIVoKuYFcwFOSClkCfqQn2BPUoKtqCpkDPM4HcL3YEe5xnPCPvFLugxrcuBX8L7gWpIOpBoAeJ1AWxh3sE3wVKoEpwDSDPFPJMMRUMCK6k+oIugZVUAwINYnwtkHei/Yg/c8GIrFPQDKAvPxI8FuwEWWmgmeBWcAeZCvcagQodd23Z/0n2mtxYCRxAoAuZGAI9Mu5yvhMQ0KweocCp4EgwIehj7u+i1E5JInov+AIBTfg6LMEQ6ZsQUFJSy2jBvVZ53wI1gyz9Ud4pUgIqqpSEUlnQA [...]
+       "    </tr>\n",
+       "  </tbody>\n",
+       "</table>"
+      ],
+      "text/plain": [
+       "<IPython.core.display.HTML object>"
+      ]
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    }
+   ],
+   "source": [
+    "''' Sample UDF to visualize a particular region of a GeoTiff image'''\n",
+    "\n",
+    "def generatemask(band, width,height):\n",
+    "    for (i,val) in enumerate(band):\n",
+    "        if (i%width>=12 and i%width<26) and (i%height>=12 and i%height<26):\n",
+    "            band[i] = 255.0\n",
+    "        else:\n",
+    "            band[i] = 0.0\n",
+    "    return band\n",
+    "\n",
+    "maskValues = udf(generatemask, ArrayType(DoubleType()))\n",
+    "spark.udf.register(\"RS_MaskValues\", maskValues)\n",
+    "\n",
+    "\n",
+    "df_base64 = df.selectExpr(\"Geom\", \"RS_Base64(height,width,RS_Normalize(targetband), RS_Array(height*width,0), RS_Array(height*width, 0), RS_MaskValues(targetband,width,height)) as region\" )\n",
+    "df_HTML = df_base64.selectExpr(\"Geom\",\"RS_HTML(region) as selectedregion\")\n",
+    "display(HTML(df_HTML.limit(2).toPandas().to_html(escape=False)))\n"
+   ]
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "apache-sedona",
+   "language": "python",
+   "name": "apache-sedona"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.7.5"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 2
+}
diff --git a/binder/data/raster/T21HUB_4704_4736_8224_8256.tif b/binder/data/raster/T21HUB_4704_4736_8224_8256.tif
new file mode 100644
index 0000000..776d753
Binary files /dev/null and b/binder/data/raster/T21HUB_4704_4736_8224_8256.tif differ
diff --git a/binder/data/raster/vya_T21HUB_992_1024_4352_4384.tif b/binder/data/raster/vya_T21HUB_992_1024_4352_4384.tif
new file mode 100644
index 0000000..6df2b0a
Binary files /dev/null and b/binder/data/raster/vya_T21HUB_992_1024_4352_4384.tif differ
diff --git a/core/src/test/resources/raster/test1.tiff b/core/src/test/resources/raster/test1.tiff
new file mode 100644
index 0000000..bebd682
Binary files /dev/null and b/core/src/test/resources/raster/test1.tiff differ
diff --git a/core/src/test/resources/raster/test2.tiff b/core/src/test/resources/raster/test2.tiff
new file mode 100644
index 0000000..bebd682
Binary files /dev/null and b/core/src/test/resources/raster/test2.tiff differ
diff --git a/docs/api/sql/Constructor.md b/docs/api/sql/Constructor.md
index d456d1f..9544488 100644
--- a/docs/api/sql/Constructor.md
+++ b/docs/api/sql/Constructor.md
@@ -184,4 +184,4 @@ Spark SQL example:
 SELECT *
 FROM pointdf
 WHERE ST_Contains(ST_PolygonFromEnvelope(1.0,100.0,1000.0,1100.0), pointdf.pointshape)
-```
+```
\ No newline at end of file
diff --git a/docs/api/sql/Raster-loader.md b/docs/api/sql/Raster-loader.md
new file mode 100644
index 0000000..f4197b1
--- /dev/null
+++ b/docs/api/sql/Raster-loader.md
@@ -0,0 +1,144 @@
+### Geotiff Dataframe Loader
+
+Introduction: The GeoTiff loader of Sedona is a Spark built-in data source. It can read a single geotiff image or 
+a number of geotiff images into a DataFrame.
+
+Since: `v1.1.0`
+
+Spark SQL example:
+
+The input path could be a path to a single GeoTiff image or a directory of GeoTiff images.
+ You can optionally append an option to drop invalid images. The geometry bound of each image is automatically loaded
+as a Sedona geometry and is transformed to WGS84 (EPSG:4326) reference system.
+
+```Scala
+var geotiffDF = sparkSession.read.format("geotiff").option("dropInvalid", true).load("YOUR_PATH")
+geotiffDF.printSchema()
+```
+
+Output:
+
+```html
+ |-- image: struct (nullable = true)
+ |    |-- origin: string (nullable = true)
+ |    |-- Geometry: geometry (nullable = true)
+ |    |-- height: integer (nullable = true)
+ |    |-- width: integer (nullable = true)
+ |    |-- nBands: integer (nullable = true)
+ |    |-- data: array (nullable = true)
+ |    |    |-- element: double (containsNull = true)
+```
+
+You can also select sub-attributes individually to construct a new DataFrame
+
+```Scala
+geotiffDF = geotiffDF.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+geotiffDF.createOrReplaceTempView("GeotiffDataframe")
+geotiffDF.show()
+```
+
+Output:
+
+```html
++--------------------+--------------------+------+-----+--------------------+-----+
+|              origin|                Geom|height|width|                data|bands|
++--------------------+--------------------+------+-----+--------------------+-----+
+|file:///home/hp/D...|POLYGON ((-58.699...|    32|   32|[1058.0, 1039.0, ...|    4|
+|file:///home/hp/D...|POLYGON ((-58.297...|    32|   32|[1258.0, 1298.0, ...|    4|
++--------------------+--------------------+------+-----+--------------------+-----+
+```
+
+## RS_GetBand
+
+Introduction: Return a particular band from Geotiff Dataframe
+
+The number of total bands can be obtained from the GeoTiff loader
+
+Format: `RS_GetBand (allBandValues: Array[Double], targetBand:Int, totalBands:Int)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+
+```Scala
+val BandDF = spark.sql("select RS_GetBand(data, 2, Band) as targetBand from GeotiffDataframe")
+BandDF.show()
+```
+
+Output:
+
+```html
++--------------------+
+|          targetBand|
++--------------------+
+|[1058.0, 1039.0, ...|
+|[1258.0, 1298.0, ...|
++--------------------+
+```
+
+## RS_Array
+
+Introduction: Create an array that is filled by the given value
+
+Format: `RS_Array(length:Int, value: Decimal)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+
+```Scala
+SELECT RS_Array(height * width, 0.0)
+```
+
+## RS_Base64
+
+Introduction: Return a Base64 String from a geotiff image
+
+Format: `RS_Base64 (height:Int, width:Int, redBand: Array[Double], greenBand: Array[Double], blackBand: Array[Double], 
+optional: alphaBand: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+val BandDF = spark.sql("select RS_Base64(h, w, band1, band2, RS_Array(h*w, 0)) as baseString from dataframe")
+BandDF.show()
+```
+
+Output:
+
+```html
++--------------------+
+|          baseString|
++--------------------+
+|QJCIAAAAAABAkDwAA...|
+|QJOoAAAAAABAlEgAA...|
++--------------------+
+```
+
+!!!note
+	Although the 3 RGB bands are mandatory, you can use [RS_Array(h*w, 0.0)](#rs_array) to create an array (zeroed out, size = h * w) as input.
+
+## RS_HTML
+
+Introduction: Return a html img tag with the base64 string embedded
+
+Format: `RS_HTML(base64:String, optional: width_in_px:String)`
+
+Spark SQL example:
+
+```Scala
+df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" ).show()
+```
+
+Output:
+
+```html
++--------------------+
+|          htmlstring|
++--------------------+
+|<img src="data:im...|
+|<img src="data:im...|
++--------------------+
+```
+
diff --git a/docs/api/sql/Raster-operators.md b/docs/api/sql/Raster-operators.md
new file mode 100644
index 0000000..a401ba0
--- /dev/null
+++ b/docs/api/sql/Raster-operators.md
@@ -0,0 +1,309 @@
+## RS_AddBand
+
+Introduction: Add two spectral bands in a Geotiff image 
+
+Format: `RS_AddBand (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val sumDF = spark.sql("select RS_Add(band1, band2) as sumOfBands from dataframe")
+
+```
+
+## RS_SubtractBands
+
+Introduction: Subtract two spectral bands in a Geotiff image(band2 - band1)
+
+Format: `RS_SubtractBands (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val subtractDF = spark.sql("select RS_Subtract(band1, band2) as differenceOfOfBands from dataframe")
+
+```
+
+## RS_MultiplyBands
+
+Introduction: Multiply two spectral bands in a Geotiff image
+
+Format: `RS_MultiplyBands (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val multiplyDF = spark.sql("select RS_MultiplyBands(band1, band2) as multiplyBands from dataframe")
+
+```
+
+## RS_DivideBands
+
+Introduction: Divide band1 with band2 from a geotiff image
+
+Format: `RS_DivideBands (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val multiplyDF = spark.sql("select RS_DivideBands(band1, band2) as divideBands from dataframe")
+
+```
+
+## RS_MultiplyFactor
+
+Introduction: Multiply a factor to a spectral band in a geotiff image
+
+Format: `RS_MultiplyFactor (Band1: Array[Double], Factor: Int)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val multiplyFactorDF = spark.sql("select RS_MultiplyFactor(band1, 2) as multiplyfactor from dataframe")
+
+```
+
+## RS_Mode
+
+Introduction: Returns Mode from a spectral band in a Geotiff image in form of an array
+
+Format: `RS_Mode (Band: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val modeDF = spark.sql("select RS_Mode(band) as mode from dataframe")
+
+```
+
+## RS_Mean
+
+Introduction: Returns Mean value for a spectral band in a Geotiff image
+
+Format: `RS_Mean (Band: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val meanDF = spark.sql("select RS_Mean(band) as mean from dataframe")
+
+```
+
+## RS_NormalizedDifference
+
+Introduction: Returns Normalized Difference between two bands(band2 and band1) in a Geotiff image(example: NDVI, NDBI) 
+
+Format: `RS_NormalizedDifference (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val normalizedDF = spark.sql("select RS_NormalizedDifference(band1, band2) as normdifference from dataframe")
+
+```
+
+## RS_Count
+
+Introduction: Returns count of a particular value from a spectral band in a raster image
+
+Format: `RS_Count (Band1: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val countDF = spark.sql("select RS_Count(band1, target) as count from dataframe")
+
+```
+
+## RS_GreaterThan
+
+Introduction: Mask all the values with 1 which are greater than a particular target value 
+
+Format: `RS_GreaterThan (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val greaterDF = spark.sql("select RS_GreaterThan(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_GreaterThanEqual
+
+Introduction: Mask all the values with 1 which are greater than equal to a particular target value
+
+Format: `RS_GreaterThanEqual (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val greaterEqualDF = spark.sql("select RS_GreaterThanEqual(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_LessThan
+
+Introduction: Mask all the values with 1 which are less than a particular target value
+
+Format: `RS_LessThan (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val lessDF = spark.sql("select RS_LessThan(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_LessThanEqual
+
+Introduction: Mask all the values with 1 which are less than equal to a particular target value
+
+Format: `RS_LessThanEqual (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val lessEqualDF = spark.sql("select RS_LessThanEqual(band, target) as maskedvalues from dataframe")
+
+```
+
+## RS_Modulo
+
+Introduction: Find modulo of pixels with respect to a particular value
+
+Format: `RS_Modulo (Band: Array[Double], Target: Double)`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val moduloDF = spark.sql("select RS_Modulo(band, target) as modulo from dataframe")
+
+```
+
+## RS_BitwiseAND
+
+Introduction: Find Bitwise AND between two bands of Geotiff image
+
+Format: `RS_BitwiseAND (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val biwiseandDF = spark.sql("select RS_BitwiseAND(band1, band2) as andvalue from dataframe")
+
+```
+
+## RS_BitwiseOR
+
+Introduction: Find Bitwise OR between two bands of Geotiff image
+
+Format: `RS_BitwiseOR (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val biwiseorDF = spark.sql("select RS_BitwiseOR(band1, band2) as or from dataframe")
+
+```
+
+## RS_SquareRoot
+
+Introduction: Find Square root of band values in a geotiff image 
+
+Format: `RS_SquareRoot (Band: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val rootDF = spark.sql("select RS_SquareRoot(band) as squareroot from dataframe")
+
+```
+
+## RS_LogicalDifference
+
+Introduction: Return value from band 1 if a value in band1 and band2 are different, else return 0
+
+Format: `RS_LogicalDifference (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val logicalDifference = spark.sql("select RS_LogicalDifference(band1, band2) as logdifference from dataframe")
+
+```
+
+## RS_LogicalOver
+
+Introduction: Return value from band1 if it's not equal to 0, else return band2 value 
+
+Format: `RS_LogicalOver (Band1: Array[Double], Band2: Array[Double])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val logicalOver = spark.sql("select RS_LogicalOver(band1, band2) as logover from dataframe")
+
+```
+
+## RS_FetchRegion
+
+Introduction: Fetch a subset of region from given Geotiff image based on minimumX, minimumY, maximumX and maximumY index as well original height and width of image
+
+Format: `RS_FetchRegion (Band: Array[Double], coordinates: Array[Int], dimenstions: Array[Int])`
+
+Since: `v1.1.0`
+
+Spark SQL example:
+```Scala
+
+val region = spark.sql("select RS_FetchRegion(Band,Array(0, 0, 1, 2),Array(3, 3)) as Region from dataframe")
+```
+
+## RS_Normalize
+
+Introduction: Normalize the value in the array to [0, 255]
+
+Since: `v1.1.0`
+
+Spark SQL example
+```SQL
+SELECT RS_Normalize(band)
+```
\ No newline at end of file
diff --git a/docs/tutorial/raster.md b/docs/tutorial/raster.md
new file mode 100644
index 0000000..ca7a346
--- /dev/null
+++ b/docs/tutorial/raster.md
@@ -0,0 +1,19 @@
+# Raster data processing
+
+Starting from `v1.1.0`, Sedona SQL supports raster data sources and raster operators in DataFrame and SQL. Raster support is available in all Sedona language bindings including ==Scala, Java, Python and R==.
+
+## Initial setup
+
+1. [Set up dependencies](../sql/#set-up-dependencies)
+2. [Initiate Spark session](../sql/#initiate-sparksession)
+3. [Register SedonaSQL](../sql/#register-sedonasql)
+
+## API docs
+
+[IO of raster data in DataFrame](../../api/sql/Raster-loader/)
+
+[Map algebra in DataFrame](../../api/sql/Raster-operators/)
+
+## Tutorials
+
+[Python Jupyter Notebook](https://github.com/apache/incubator-sedona/blob/master/binder/ApacheSedonaRaster_1.ipynb)
diff --git a/mkdocs.yml b/mkdocs.yml
index 38d9df8..4e1f76f 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -22,6 +22,7 @@ nav:
         - Scala/Java: tutorial/sql.md
         - Pure SQL: tutorial/sql-pure-sql.md
         - Python: tutorial/sql-python.md
+        - Raster data: tutorial/raster.md
       - Map visualization SQL app:
         - Scala/Java: tutorial/viz.md
         - Use Apache Zeppelin: tutorial/zeppelin.md
@@ -37,11 +38,15 @@ nav:
         - Python doc: api/python-api.md
       - SQL:
         - Quick start: api/sql/Overview.md
-        - Constructor: api/sql/Constructor.md
-        - Function: api/sql/Function.md
-        - Predicate: api/sql/Predicate.md
-        - Aggregate function: api/sql/AggregateFunction.md
-        - Join query (optimizer): api/sql/Optimizer.md
+        - Vector data:
+            - Constructor: api/sql/Constructor.md
+            - Function: api/sql/Function.md
+            - Predicate: api/sql/Predicate.md
+            - Aggregate function: api/sql/AggregateFunction.md
+            - Join query (optimizer): api/sql/Optimizer.md
+        - Raster data:
+            - Raster input and output: api/sql/Raster-loader.md
+            - Raster operators: api/sql/Raster-operators.md
         - Parameter: api/sql/Parameter.md
       - Viz:
         - DataFrame/SQL: api/viz/sql.md
diff --git a/pom.xml b/pom.xml
index c721318..d1f5e27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,6 +159,19 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!--for GeoTiff Reader-->
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-geotiff</artifactId>
+            <version>${geotools.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-coverage</artifactId>
+            <version>${geotools.version}</version>
+            <scope>${dependency.scope}</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_${scala.compat.version}</artifactId>
diff --git a/python-adapter/.gitignore b/python-adapter/.gitignore
index 804f11a..67d8d31 100644
--- a/python-adapter/.gitignore
+++ b/python-adapter/.gitignore
@@ -2,4 +2,5 @@
 bin
 /.settings
 /.classpath
-/.project
\ No newline at end of file
+/.project
+*.iml
\ No newline at end of file
diff --git a/python-adapter/pom.xml b/python-adapter/pom.xml
index 9844754..4b898c9 100644
--- a/python-adapter/pom.xml
+++ b/python-adapter/pom.xml
@@ -87,6 +87,19 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <!--for GeoTiff Reader-->
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-geotiff</artifactId>
+            <version>${geotools.version}</version>
+            <scope>${geotools.scope}</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.geotools</groupId>
+            <artifactId>gt-coverage</artifactId>
+            <version>${geotools.version}</version>
+            <scope>${geotools.scope}</scope>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/main/scala</sourceDirectory>
diff --git a/spark-version-converter.py b/spark-version-converter.py
index c04ffe2..ac6fb41 100644
--- a/spark-version-converter.py
+++ b/spark-version-converter.py
@@ -24,7 +24,8 @@ spark3_anchor = 'SPARK3 anchor'
 files = ['sql/src/main/scala/org/apache/sedona/sql/UDF/UdfRegistrator.scala',
          'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/TraitJoinQueryExec.scala',
          'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/JoinQueryDetector.scala',
-         'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala']
+         'sql/src/main/scala/org/apache/spark/sql/sedona_sql/strategy/join/BroadcastIndexJoinExec.scala',
+         'sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala']
 
 def switch_version(line):
     if line[:2] == '//':
diff --git a/sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..6592fb0
--- /dev/null
+++ b/sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1 @@
+org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index c8b6fdb..5d0b052 100644
--- a/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -21,10 +21,12 @@ package org.apache.sedona.sql.UDF
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}
 import org.apache.spark.sql.sedona_sql.expressions._
+import org.apache.spark.sql.sedona_sql.expressions.raster.{RS_AddBands, RS_Array, RS_Base64, RS_BitwiseAnd, RS_BitwiseOr, RS_Count, RS_DivideBands, RS_FetchRegion, RS_GetBand, RS_GreaterThan, RS_GreaterThanEqual, RS_HTML, RS_LessThan, RS_LessThanEqual, RS_LogicalDifference, RS_LogicalOver, RS_Mean, RS_Mode, RS_Modulo, RS_MultiplyBands, RS_MultiplyFactor, RS_Normalize, RS_NormalizedDifference, RS_SquareRoot, RS_SubtractBands}
 import org.locationtech.jts.geom.Geometry
 
 object Catalog {
   val expressions: Seq[FunctionBuilder] = Seq(
+    // Expression for vectors
     ST_PointFromText,
     ST_PolygonFromText,
     ST_LineStringFromText,
@@ -83,7 +85,34 @@ object Catalog {
     ST_LineSubstring,
     ST_LineInterpolatePoint,
     ST_SubDivideExplode,
-    ST_SubDivide
+    ST_SubDivide,
+
+    // Expression for rasters
+    RS_NormalizedDifference,
+    RS_Mean,
+    RS_Mode,
+    RS_FetchRegion,
+    RS_GreaterThan,
+    RS_GreaterThanEqual,
+    RS_LessThan,
+    RS_LessThanEqual,
+    RS_AddBands,
+    RS_SubtractBands,
+    RS_DivideBands,
+    RS_MultiplyFactor,
+    RS_MultiplyBands,
+    RS_BitwiseAnd,
+    RS_BitwiseOr,
+    RS_Count,
+    RS_Modulo,
+    RS_GetBand,
+    RS_SquareRoot,
+    RS_LogicalDifference,
+    RS_LogicalOver,
+    RS_Base64,
+    RS_HTML,
+    RS_Array,
+    RS_Normalize
   )
 
   val aggregateExpressions: Seq[Aggregator[Geometry, Geometry, Geometry]] = Seq(
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
index bbe8205..d9e60e7 100644
--- a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/Functions.scala
@@ -1125,7 +1125,6 @@ case class ST_FlipCoordinates(inputExpressions: Seq[Expression])
   override def children: Seq[Expression] = inputExpressions
 }
 
-
 case class ST_SubDivide(inputExpressions: Seq[Expression])
   extends Expression with CodegenFallback {
   override def nullable: Boolean = true
@@ -1167,4 +1166,4 @@ case class ST_SubDivideExplode(children: Seq[Expression]) extends Generator {
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev
-}
\ No newline at end of file
+}
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
new file mode 100644
index 0000000..760d627
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Functions.scala
@@ -0,0 +1,865 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.expressions.raster
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.sedona_sql.expressions.UserDataGeneratator
+import org.apache.spark.sql.types._
+
+
+
+/// Calculate Normalized Difference between two bands
+case class RS_NormalizedDifference(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val ndvi = normalizeddifference(band1, band2)
+
+    new GenericArrayData(ndvi)
+  }
+  private def normalizeddifference(band1: Array[Double], band2: Array[Double]): Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for (i <- 0 until band1.length) {
+      if (band1(i) == 0) {
+        band1(i) = -1
+      }
+      if (band2(i) == 0) {
+        band2(i) = -1
+      }
+
+      result(i) = ((band2(i) - band1(i)) / (band2(i) + band1(i))*100).round/100.toDouble
+    }
+
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Calculate mean value for a particular band
+case class RS_Mean(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 1)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val mean = calculateMean(band)
+    mean
+  }
+
+  private def calculateMean(band:Array[Double]):Double = {
+
+    ((band.toList.sum/band.length)*100).round/100.toDouble
+  }
+
+
+  override def dataType: DataType = DoubleType
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Calculate mode of a particular band
+case class RS_Mode(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 1)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val mode = calculateMode(band)
+    new GenericArrayData(mode)
+  }
+
+  private def calculateMode(band:Array[Double]):Array[Double] = {
+
+    val grouped = band.toList.groupBy(x => x).mapValues(_.size)
+    val modeValue = grouped.maxBy(_._2)._2
+    val modes = grouped.filter(_._2 == modeValue).map(_._1)
+    modes.toArray
+
+  }
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// fetch a particular region from a raster image given particular indexes(Array[minx...maxX][minY...maxY])
+case class RS_FetchRegion(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 3)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val coordinates =  inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toIntArray()
+    val dim = inputExpressions(2).eval(inputRow).asInstanceOf[GenericArrayData].toIntArray()
+    new GenericArrayData(regionEnclosed(band, coordinates,dim))
+
+  }
+
+  private def regionEnclosed(Band: Array[Double], coordinates: Array[Int], dim: Array[Int]):Array[Double] = {
+
+    val result1D = new Array[Double]((coordinates(2) - coordinates(0) + 1) * (coordinates(3) - coordinates(1) + 1))
+
+    var k = 0
+    for(i<-coordinates(0) until coordinates(2) + 1) {
+      for(j<-coordinates(1) until coordinates(3) + 1) {
+        result1D(k) = Band(((i - 0) * dim(0)) + j)
+        k+=1
+      }
+    }
+    result1D
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are greater than a particular threshold
+case class RS_GreaterThan(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+    new GenericArrayData(findGreaterThan(band, target))
+
+  }
+
+  private def findGreaterThan(band: Array[Double], target: Double):Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+      if(band(i)>target) {
+        result(i) = 1
+      }
+      else {
+        result(i) = 0
+      }
+    }
+    result
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are greater than or equal to a particular threshold
+case class RS_GreaterThanEqual(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+    new GenericArrayData(findGreaterThanEqual(band, target))
+
+  }
+
+  private def findGreaterThanEqual(band: Array[Double], target: Double):Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+      if(band(i)>=target) {
+        result(i) = 1
+      }
+      else {
+        result(i) = 0
+      }
+    }
+    result
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are less than a particular threshold
+case class RS_LessThan(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+    new GenericArrayData(findLessThan(band, target))
+
+  }
+
+  private def findLessThan(band: Array[Double], target: Double):Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+      if(band(i)<target) {
+        result(i) = 1
+      }
+      else {
+        result(i) = 0
+      }
+    }
+    result
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Mark all the band values with 1 which are less than or equal to a particular threshold
+case class RS_LessThanEqual(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+    new GenericArrayData(findLessThanEqual(band, target))
+
+  }
+
+  private def findLessThanEqual(band: Array[Double], target: Double):Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+      if(band(i)<=target) {
+        result(i) = 1
+      }
+      else {
+        result(i) = 0
+      }
+    }
+    result
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Count number of occurences of a particular value in a band
+case class RS_Count(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+
+    val target = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+    findCount(band, target)
+
+  }
+
+  private def findCount(band: Array[Double], target: Double):Int = {
+
+    var result = 0
+    for(i<-0 until band.length) {
+      if(band(i)==target) {
+        result+=1
+      }
+
+    }
+    result
+  }
+
+  override def dataType: DataType = IntegerType
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Multiply a factor to all values of a band
+case class RS_MultiplyFactor(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val target = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+    new GenericArrayData(multiply(band, target))
+
+  }
+
+  private def multiply(band: Array[Double], target: Int):Array[Double] = {
+
+    var result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+
+      result(i) = band(i)*target
+
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Add two bands
+case class RS_AddBands(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(addBands(band1, band2))
+
+  }
+
+  private def addBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      result(i) = band1(i) + band2(i)
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Subtract two bands
+case class RS_SubtractBands(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(subtractBands(band1, band2))
+
+  }
+
+  private def subtractBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      result(i) = band2(i) - band1(i)
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Multiple two bands
+case class RS_MultiplyBands(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(multiplyBands(band1, band2))
+
+  }
+
+  private def multiplyBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      result(i) = band1(i) * band2(i)
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Divide two bands
+case class RS_DivideBands(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(divideBands(band1, band2))
+
+  }
+
+  private def divideBands(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      result(i) = ((band1(i)/band2(i))*100).round/(100.toDouble)
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Modulo of a band
+case class RS_Modulo(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    val dividend = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+
+
+    new GenericArrayData(modulo(band, dividend))
+
+  }
+
+  private def modulo(band: Array[Double], dividend:Double):Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+      result(i) = band(i) % dividend
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Square root of values in a band
+case class RS_SquareRoot(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 1)
+    var band:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    new GenericArrayData(squareRoot(band))
+
+  }
+
+  private def squareRoot(band: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    for(i<-0 until band.length) {
+      result(i) = (Math.sqrt(band(i))*100).round/100.toDouble
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Bitwise AND between two bands
+case class RS_BitwiseAnd(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(bitwiseAnd(band1, band2))
+
+  }
+
+  private def bitwiseAnd(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      result(i) = band1(i).toInt & band2(i).toInt
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// Bitwise OR between two bands
+case class RS_BitwiseOr(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(bitwiseOr(band1, band2))
+
+  }
+
+  private def bitwiseOr(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      result(i) = band1(i).toInt | band2(i).toInt
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// if a value in band1 and band2 are different,value from band1 ins returned else return 0
+case class RS_LogicalDifference(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(logicalDifference(band1, band2))
+
+  }
+
+  private def logicalDifference(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      if(band1(i) != band2(i))
+      {
+        result(i) = band1(i)
+      }
+      else
+      {
+        result(i) = 0.0
+      }
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+// If a value in band 1 is not equal to 0, band1 is returned else value from band2 is returned
+case class RS_LogicalOver(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    var band1:Array[Double] = null
+    var band2:Array[Double] = null
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData" &&
+      inputExpressions(1).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData"
+    ) {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band1 = inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      band2 = inputExpressions(1).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    }
+    assert(band1.length == band2.length)
+
+    new GenericArrayData(logicalOver(band1, band2))
+
+  }
+
+  private def logicalOver(band1: Array[Double], band2: Array[Double]):Array[Double] = {
+
+    val result = new Array[Double](band1.length)
+    for(i<-0 until band1.length) {
+      if(band1(i) != 0.0)
+      {
+        result(i) = band1(i)
+      }
+      else
+      {
+        result(i) = band2(i)
+      }
+    }
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_Normalize(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    var band:Array[Double] = null
+
+    if(inputExpressions(0).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+      band =inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    }
+    else {
+      band =inputExpressions(0).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+
+    }
+    val result = normalize(band)
+    new GenericArrayData(result)
+  }
+
+  // Normalize between 0 and 255
+  private def normalize(band: Array[Double]): Array[Double] = {
+
+    val result = new Array[Double](band.length)
+    val maxVal = band.toList.max
+
+    for(i<-0 until band.length) {
+      result(i) = (band(i)/(maxVal/255.0)).toInt
+    }
+
+    result
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
new file mode 100644
index 0000000..a725ad6
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/IO.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.sedona_sql.expressions.raster
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, UnsafeArrayData}
+import org.apache.spark.sql.catalyst.util.GenericArrayData
+import org.apache.spark.sql.sedona_sql.expressions.UserDataGeneratator
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+import org.geotools.coverage.grid.io.GridFormatFinder
+import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D}
+import org.geotools.geometry.jts.JTS
+import org.geotools.referencing.CRS
+import org.geotools.util.factory.Hints
+import org.locationtech.jts.geom.{Coordinate, Geometry, GeometryFactory}
+import org.opengis.coverage.grid.GridEnvelope
+import org.opengis.referencing.crs.CoordinateReferenceSystem
+import org.opengis.referencing.operation.MathTransform
+
+import java.awt.Color
+import java.awt.image.BufferedImage
+import java.io.{ByteArrayOutputStream, IOException}
+import java.util.Base64
+import javax.imageio.ImageIO
+
+class GeometryOperations {
+
+  var coverage:GridCoverage2D = null
+  var source:CoordinateReferenceSystem = null
+  var target:CoordinateReferenceSystem = null
+  var targetCRS:MathTransform =  null
+
+  def getDimensions(url:String):GridEnvelope = {
+    val format = GridFormatFinder.findFormat(url)
+    val hints = new Hints(Hints.FORCE_LONGITUDE_FIRST_AXIS_ORDER, true)
+    val reader = format.getReader(url, hints)
+
+
+    try coverage = reader.read(null)
+    catch {
+      case giveUp: IOException =>
+        throw new RuntimeException(giveUp)
+    }
+    reader.dispose()
+    source = coverage.getCoordinateReferenceSystem
+    target = CRS.decode("EPSG:4326", true)
+    targetCRS = CRS.findMathTransform(source, target)
+    val gridRange2D = coverage.getGridGeometry.getGridRange
+    gridRange2D
+
+  }
+   def readGeometry(url: String): Geometry = {
+    val gridRange2D = getDimensions(url)
+    val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getLow(1)))
+    val polyCoordinates = new Array[Coordinate](5)
+    var index = 0
+
+    for (point <- cords) {
+      val coordinate2D = new GridCoordinates2D(point(0), point(1))
+      val result = coverage.getGridGeometry.gridToWorld(coordinate2D)
+      polyCoordinates({
+        index += 1; index - 1
+      }) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1))
+    }
+
+    polyCoordinates(index) = polyCoordinates(0)
+    val factory = new GeometryFactory
+    val polygon = JTS.transform(factory.createPolygon(polyCoordinates), targetCRS)
+
+    polygon
+
+  }
+}
+
+// get a particular band from a results of ST_GeomWithBandsFromGeoTiff
+case class RS_GetBand(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 3)
+    val bandInfo =inputExpressions(0).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+    val targetBand = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+    val totalBands = inputExpressions(2).eval(inputRow).asInstanceOf[Int]
+    val result = gettargetband(bandInfo, targetBand, totalBands)
+    new GenericArrayData(result)
+  }
+
+  // fetch target band from the given array of bands
+  private def gettargetband(bandinfo: Array[Double], targetband:Int, totalbands:Int): Array[Double] = {
+    val sizeOfBand = bandinfo.length/totalbands
+    val lowerBound = (targetband - 1)*sizeOfBand
+    val upperBound = targetband*sizeOfBand
+    assert(bandinfo.slice(lowerBound,upperBound).length == sizeOfBand)
+    bandinfo.slice(lowerBound, upperBound)
+
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_Array(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length == 2)
+    val len =inputExpressions(0).eval(inputRow).asInstanceOf[Int]
+    val num = inputExpressions(1).eval(inputRow).asInstanceOf[Decimal].toDouble
+    val result = createarray(len, num)
+    new GenericArrayData(result)
+  }
+
+  // Generate an empty band for the given spectral band in ageotiff image
+  private def createarray(len:Int, num:Double):Array[Double] = {
+
+    val result = new Array[Double](len)
+    for(i<-0 until len) {
+      result(i) = num
+    }
+    result
+  }
+
+  override def dataType: DataType = ArrayType(DoubleType)
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_Base64(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    assert(inputExpressions.length>=5 && inputExpressions.length<=6)
+
+    val height = inputExpressions(0).eval(inputRow).asInstanceOf[Int]
+    val width = inputExpressions(1).eval(inputRow).asInstanceOf[Int]
+    val band1 = inputExpressions(2).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    val band2 = inputExpressions(3).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    val band3 = inputExpressions(4).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+    var bufferedimage:BufferedImage = null
+    if(inputExpressions.length==5) {
+        bufferedimage = getBufferedimage(band1, band2, band3, null , height, width)
+      }
+    else {
+      var band4:Array[Double] = null
+      if(inputExpressions(5).eval(inputRow).getClass.toString() == "class org.apache.spark.sql.catalyst.expressions.UnsafeArrayData") {
+        band4 = inputExpressions(5).eval(inputRow).asInstanceOf[UnsafeArrayData].toDoubleArray()
+      }
+      else {
+        band4 = inputExpressions(5).eval(inputRow).asInstanceOf[GenericArrayData].toDoubleArray()
+      }
+      bufferedimage = getBufferedimage(band1, band2, band3, band4, height, width)
+    }
+
+    val result = convertToBase64(bufferedimage)
+    UTF8String.fromString(result)
+  }
+
+  private def getBufferedimage(band1:Array[Double], band2:Array[Double], band3:Array[Double], band4:Array[Double], height:Int, width:Int): BufferedImage = {
+    val image = new BufferedImage(width, height, BufferedImage.TYPE_INT_ARGB)
+    var w = 0
+    var h = 0
+    band4 match {
+      case null => {
+        for (i <- 0 until (height * width)) {
+          if(i>0 && i%height==0) {
+            h+=1
+          }
+          w = i%width
+          image.setRGB(w, h, new Color(band1(i).toInt, band2(i).toInt, band3(i).toInt, 255).getRGB())
+        }
+        image
+      }
+      case _ => {
+        for (i <- 0 until (height * width)) {
+          if(i>0 && i%height==0) {
+            h+=1
+          }
+          w = i%width
+          image.setRGB(w, h, new Color(band1(i).toInt, band2(i).toInt, band3(i).toInt, band4(i).toInt).getRGB())
+
+        }
+        image
+      }
+
+    }
+  }
+
+  // Convert Buffered image to Base64 String
+   private def convertToBase64(image: BufferedImage): String = {
+
+    val os = new ByteArrayOutputStream()
+    ImageIO.write(image,"png", os)
+    Base64.getEncoder.encodeToString(os.toByteArray)
+  }
+
+
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
+case class RS_HTML(inputExpressions: Seq[Expression])
+  extends Expression with CodegenFallback with UserDataGeneratator {
+  override def nullable: Boolean = false
+
+  override def eval(inputRow: InternalRow): Any = {
+    // This is an expression which takes one input expressions
+    val encodedstring =inputExpressions(0).eval(inputRow).asInstanceOf[UTF8String].toString
+    // Add image width if needed
+    var imageWidth = "200"
+    if (inputExpressions.length == 2) imageWidth = inputExpressions(1).eval(inputRow).asInstanceOf[UTF8String].toString
+    val result = htmlstring(encodedstring, imageWidth)
+    UTF8String.fromString(result)
+  }
+
+  // create HTML string from Base64 string
+  private def htmlstring(encodestring: String, imageWidth: String): String = {
+    "<img src=\"" + createmainstring(encodestring) + "\" width=\"" + imageWidth + "\" />"
+  }
+
+  private def createmainstring(encodestring:String): String = {
+
+    val result = s"data:image/png;base64,$encodestring"
+    result
+  }
+  override def dataType: DataType = StringType
+
+  override def children: Seq[Expression] = inputExpressions
+}
+
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
new file mode 100644
index 0000000..cf9a314
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.spark.sql.sedona_sql.io
+
+
+import com.google.common.io.{ByteStreams, Closeables}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.encoders.RowEncoder
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
+import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableConfiguration
+
+private[spark] class GeotiffFileFormat extends FileFormat with DataSourceRegister {
+
+  override def inferSchema(
+                            sparkSession: SparkSession,
+                            options: Map[String, String],
+                            files: Seq[FileStatus]): Option[StructType] = Some(GeotiffSchema.imageSchema)
+
+  override def prepareWrite(
+                             sparkSession: SparkSession,
+                             job: Job,
+                             options: Map[String, String],
+                             dataSchema: StructType): OutputWriterFactory = {
+    throw new UnsupportedOperationException("Write is not supported for image data source")
+  }
+
+  override def shortName(): String = "geotiff"
+
+  override protected def buildReader(
+                                      sparkSession: SparkSession,
+                                      dataSchema: StructType,
+                                      partitionSchema: StructType,
+                                      requiredSchema: StructType,
+                                      filters: Seq[Filter],
+                                      options: Map[String, String],
+                                      hadoopConf: Configuration): (PartitionedFile) => Iterator[InternalRow] = {
+    assert(
+      requiredSchema.length <= 1,
+      "Image data source only produces a single data column named \"image\".")
+
+    val broadcastedHadoopConf =
+      sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
+
+    val imageSourceOptions = new ImageOptions(options)
+
+    (file: PartitionedFile) => {
+      val emptyUnsafeRow = new UnsafeRow(0)
+      if (!imageSourceOptions.dropInvalid && requiredSchema.isEmpty) {
+        Iterator(emptyUnsafeRow)
+      } else {
+        val origin = file.filePath
+        val path = new Path(origin)
+        val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
+        val stream = fs.open(path)
+        val bytes = try {
+          ByteStreams.toByteArray(stream)
+        } finally {
+          Closeables.close(stream, true)
+        }
+
+        val resultOpt = GeotiffSchema.decode(origin, bytes)
+        val filteredResult = if (imageSourceOptions.dropInvalid) {
+          resultOpt.toIterator
+        } else {
+          Iterator(resultOpt.getOrElse(GeotiffSchema.invalidImageRow(origin)))
+        }
+
+        if (requiredSchema.isEmpty) {
+          filteredResult.map(_ => emptyUnsafeRow)
+        } else {
+          val converter = RowEncoder(requiredSchema).createSerializer() // SPARK3 anchor
+           filteredResult.map(row => converter(row)) // SPARK3 anchor
+//          val converter = RowEncoder(requiredSchema) // SPARK2 anchor
+//          filteredResult.map(row => converter.toRow(row)) // SPARK2 anchor
+        }
+      }
+    }
+  }
+}
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala
new file mode 100644
index 0000000..bafa0f2
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.io
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.types._
+import org.geotools.coverage.grid.io.{AbstractGridFormat, GridCoverage2DReader, OverviewPolicy}
+import org.geotools.coverage.grid.{GridCoordinates2D, GridCoverage2D}
+import org.geotools.gce.geotiff.GeoTiffReader
+import org.geotools.geometry.jts.JTS
+import org.geotools.referencing.CRS
+import org.locationtech.jts.geom.{Coordinate, GeometryFactory}
+import org.opengis.coverage.grid.{GridCoordinates, GridEnvelope}
+import org.opengis.parameter.{GeneralParameterValue, ParameterValue}
+
+import java.io.ByteArrayInputStream
+
+object GeotiffSchema {
+  val undefinedImageType = "Undefined"
+
+  /**
+   * Schema for the image column: Row(String,Geometry, Int, Int, Int, Array[Double])
+   */
+  val columnSchema = StructType(
+    StructField("origin", StringType, true) ::
+      StructField("Geometry", GeometryUDT, true) ::
+      StructField("height", IntegerType, false) ::
+      StructField("width", IntegerType, false) ::
+      StructField("nBands", IntegerType, false) ::
+      StructField("data", ArrayType(DoubleType), false) :: Nil)
+
+  val imageFields: Array[String] = columnSchema.fieldNames
+
+  /**
+   * DataFrame with a single column of images named "image" (nullable)
+   */
+  val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
+
+  /**
+   * Gets the origin of the image
+   *
+   * @return The origin of the image
+   */
+  def getOrigin(row: Row): String = row.getString(0)
+
+  /**
+   * Gets the origin of the image
+   *
+   * @return The origin of the image
+   */
+  def getGeometry(row: Row): GeometryUDT = row.getAs[GeometryUDT](1)
+
+
+  /**
+   * Gets the height of the image
+   *
+   * @return The height of the image
+   */
+  def getHeight(row: Row): Int = row.getInt(2)
+
+  /**
+   * Gets the width of the image
+   *
+   * @return The width of the image
+   */
+  def getWidth(row: Row): Int = row.getInt(3)
+
+  /**
+   * Gets the number of channels in the image
+   *
+   * @return The number of bands in the image
+   */
+  def getNBands(row: Row): Int = row.getInt(4)
+
+
+  /**
+   * Gets the image data
+   *
+   * @return The image data
+   */
+  def getData(row: Row): Array[Double] = row.getAs[Array[Double]](5)
+
+  /**
+   * Default values for the invalid image
+   *
+   * @param origin Origin of the invalid image
+   * @return Row with the default values
+   */
+  private[io] def invalidImageRow(origin: String): Row =
+    Row(Row(origin, -1, -1, -1, Array.ofDim[Byte](0)))
+
+  /**
+   *
+   * Convert a GeoTiff image into a dataframe row
+   *
+   *
+   * @param origin Arbitrary string that identifies the image
+   * @param bytes  Image bytes (for example, jpeg)
+   * @return DataFrame Row or None (if the decompression fails)
+   *
+   */
+
+  private[io] def decode(origin: String, bytes: Array[Byte]): Option[Row] = {
+
+    val policy: ParameterValue[OverviewPolicy] = AbstractGridFormat.OVERVIEW_POLICY.createValue
+    policy.setValue(OverviewPolicy.IGNORE)
+    val gridsize: ParameterValue[String] = AbstractGridFormat.SUGGESTED_TILE_SIZE.createValue
+    val useJaiRead: ParameterValue[Boolean] = AbstractGridFormat.USE_JAI_IMAGEREAD.createValue.asInstanceOf[ParameterValue[Boolean]]
+    useJaiRead.setValue(true)
+
+    // Read Geotiff image from Byte Array
+    val reader: GridCoverage2DReader = try {
+      new GeoTiffReader(new ByteArrayInputStream(bytes))
+    } catch {
+      // Catch runtime exception because `ImageIO` may throw unexpected `RuntimeException`.
+      // But do not catch the declared `IOException` (regarded as FileSystem failure)
+      case _: RuntimeException => null
+    }
+    var coverage: GridCoverage2D = null
+    if (reader == null) {
+      None
+    } else {
+      coverage = reader.read(Array[GeneralParameterValue](policy, gridsize, useJaiRead))
+    }
+
+    // Fetch geometry from given image
+    val source = coverage.getCoordinateReferenceSystem
+    val target = CRS.decode("EPSG:4326", true)
+    val targetCRS = CRS.findMathTransform(source, target)
+    val gridRange2D = coverage.getGridGeometry.getGridRange
+    val cords = Array(Array(gridRange2D.getLow(0), gridRange2D.getLow(1)), Array(gridRange2D.getLow(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getHigh(1)), Array(gridRange2D.getHigh(0), gridRange2D.getLow(1)))
+    val polyCoordinates = new Array[Coordinate](5)
+    var index = 0
+
+    for (point <- cords) {
+      val coordinate2D = new GridCoordinates2D(point(0), point(1))
+      val result = coverage.getGridGeometry.gridToWorld(coordinate2D)
+      polyCoordinates({
+        index += 1;
+        index - 1
+      }) = new Coordinate(result.getOrdinate(0), result.getOrdinate(1))
+    }
+
+    polyCoordinates(index) = polyCoordinates(0)
+    val factory = new GeometryFactory
+    val polygon = JTS.transform(factory.createPolygon(polyCoordinates), targetCRS)
+
+    // Fetch band values from given image
+    val nBands: Int = coverage.getNumSampleDimensions
+    val dimensions: GridEnvelope = reader.getOriginalGridRange
+    val maxDimensions: GridCoordinates = dimensions.getHigh
+    val width: Int = maxDimensions.getCoordinateValue(0) + 1
+    val height: Int = maxDimensions.getCoordinateValue(1) + 1
+    val imageSize = height * width * nBands
+    assert(imageSize < 1e9, "image is too large")
+    val decoded = Array.ofDim[Double](imageSize)
+
+    for (i <- 0 until height) {
+      for (j <- 0 until width) {
+        val vals: Array[Double] = new Array[Double](nBands)
+        coverage.evaluate(new GridCoordinates2D(j, i), vals)
+        // bands of a pixel will be put in [b0...b1...b2...]
+        // Each "..." represent w * h pixels
+        for (bandId <- 0 until nBands) {
+          val offset = i * width + j + nBands * bandId
+          decoded(offset) = vals(bandId)
+        }
+      }
+    }
+    // the internal "Row" is needed, because the image is a single DataFrame column
+    Some(Row(Row(origin, polygon, height, width, nBands, decoded)))
+  }
+}
+
+
+
+
+
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala
new file mode 100644
index 0000000..54c5377
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.io
+
+import org.apache.commons.io.FilenameUtils
+import org.apache.hadoop.conf.{Configuration, Configured}
+import org.apache.hadoop.fs.{Path, PathFilter}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.spark.sql.SparkSession
+
+import scala.language.existentials
+import scala.util.Random
+
+object RecursiveFlag {
+
+  /** Sets a value of spark recursive flag
+   *
+   * @param value value to set
+   * @param spark existing spark session
+   * @return previous value of this flag
+   */
+  def setRecursiveFlag(value: Option[String], spark: SparkSession): Option[String] = {
+    val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    val old = Option(hadoopConf.get(flagName))
+
+    value match {
+      case Some(v) => hadoopConf.set(flagName, v)
+      case None => hadoopConf.unset(flagName)
+    }
+
+    old
+  }
+}
+
+
+/** Filter that allows loading a fraction of HDFS files. */
+class SamplePathFilter extends Configured with PathFilter {
+  val random = {
+    val rd = new Random()
+    rd.setSeed(0)
+    rd
+  }
+
+  // Ratio of files to be read from disk
+  var sampleRatio: Double = 1
+
+  override def setConf(conf: Configuration): Unit = {
+    if (conf != null) {
+      sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
+    }
+  }
+
+  override def accept(path: Path): Boolean = {
+    // Note: checking fileSystem.isDirectory is very slow here, so we use basic rules instead
+    !SamplePathFilter.isFile(path) ||
+      random.nextDouble() < sampleRatio
+  }
+}
+
+object SamplePathFilter {
+  val ratioParam = "sampleRatio"
+
+  def isFile(path: Path): Boolean = FilenameUtils.getExtension(path.toString) != ""
+
+  /** Set/unset  hdfs PathFilter
+   *
+   * @param value       Filter class that is passed to HDFS
+   * @param sampleRatio Fraction of the files that the filter picks
+   * @param spark       Existing Spark session
+   * @return
+   */
+  def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = None, spark: SparkSession)
+  : Option[Class[_]] = {
+    val flagName = FileInputFormat.PATHFILTER_CLASS
+    val hadoopConf = spark.sparkContext.hadoopConfiguration
+    val old = Option(hadoopConf.getClass(flagName, null))
+    if (sampleRatio.isDefined) {
+      hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio.get)
+    } else {
+      hadoopConf.unset(SamplePathFilter.ratioParam)
+      None
+    }
+
+    value match {
+      case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
+      case None => hadoopConf.unset(flagName)
+    }
+    old
+  }
+}
\ No newline at end of file
diff --git a/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageOptions.scala b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageOptions.scala
new file mode 100644
index 0000000..d736de1
--- /dev/null
+++ b/sql/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageOptions.scala
@@ -0,0 +1,12 @@
+package org.apache.spark.sql.sedona_sql.io
+
+import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
+
+private[io] class ImageOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {
+  def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
+  /**
+   * Whether to drop invalid images. If true, invalid images will be removed, otherwise
+   * invalid images will be returned with empty data and all other field filled with `-1`.
+   */
+  val dropInvalid = parameters.getOrElse("dropInvalid", "false").toBoolean
+}
\ No newline at end of file
diff --git a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
index 1c33fe9..5cf0c71 100644
--- a/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
+++ b/sql/src/test/scala/org/apache/sedona/sql/TestBaseScala.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
 import org.scalatest.{BeforeAndAfterAll, FunSpec}
 
 trait TestBaseScala extends FunSpec with BeforeAndAfterAll {
+  Logger.getRootLogger().setLevel(Level.WARN)
   Logger.getLogger("org.apache").setLevel(Level.WARN)
   Logger.getLogger("com").setLevel(Level.WARN)
   Logger.getLogger("akka").setLevel(Level.WARN)
diff --git a/sql/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala b/sql/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
new file mode 100644
index 0000000..c23a1f7
--- /dev/null
+++ b/sql/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sedona.sql
+
+import org.locationtech.jts.geom.Geometry
+import org.scalatest.{BeforeAndAfter, GivenWhenThen}
+
+import scala.collection.mutable
+
+class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen {
+
+  var rasterdatalocation: String = resourceFolder + "raster/"
+
+  describe("Raster IO test") {
+    it("Should Pass geotiff loading") {
+      var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(rasterdatalocation)
+      df = df.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+      assert(df.first().getAs[Geometry](1).toText == "POLYGON ((-117.64141128097314 33.94356351407699, -117.64141128097314 33.664978146501284, -117.30939395196258 33.664978146501284," +
+        " -117.30939395196258 33.94356351407699, -117.64141128097314 33.94356351407699))")
+      assert(df.first().getInt(2) == 517)
+      assert(df.first().getInt(3) == 512)
+      assert(df.first().getInt(5) == 1)
+      val blackBand = df.first().getAs[mutable.WrappedArray[Double]](4)
+      val line1 = blackBand.slice(0, 512)
+      val line2 = blackBand.slice(512, 1024)
+      assert(line1(0) == 0.0) // The first value at line 1 is black
+      assert(line2(159) == 0.0 && line2(160) == 123.0) // In the second line, value at 159 is black and at 160 is not black
+    }
+
+    it("should pass RS_GetBand") {
+      var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(resourceFolder + "raster/")
+      df = df.selectExpr(" image.data as data", "image.nBands as bands")
+      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand")
+      assert(df.first().getAs[mutable.WrappedArray[Double]](0).length == 512 * 517)
+    }
+
+    it("should pass RS_Base64") {
+      var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(resourceFolder + "raster/")
+      df = df.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", "width","height")
+      df.createOrReplaceTempView("geotiff")
+      df = sparkSession.sql("Select RS_base64(height, width, targetBand, RS_Array(height*width, 0), RS_Array(height*width, 0)) as encodedstring from geotiff")
+//      printf(df.first().getAs[String](0))
+    }
+
+    it("should pass RS_HTML") {
+      var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(resourceFolder + "raster/")
+      df = df.selectExpr("image.origin as origin","image.Geometry as Geom", "image.height as height", "image.width as width", "image.data as data", "image.nBands as bands")
+      df = df.selectExpr("RS_GetBand(data, 1, bands) as targetBand", "width","height")
+      df.createOrReplaceTempView("geotiff")
+      df = sparkSession.sql("Select RS_base64(height, width, targetBand, RS_Array(height*width, 0.0), RS_Array(height*width, 0.0)) as encodedstring from geotiff")
+      df = df.selectExpr("RS_HTML(encodedstring, '300') as htmlstring" )
+      assert(df.first().getAs[String](0).contains("img"))
+//      printf(df.first().getAs[String](0))
+    }
+  }
+}
+
+
+
+
+
diff --git a/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala b/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
new file mode 100644
index 0000000..23fea93
--- /dev/null
+++ b/sql/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.sql
+
+import org.scalatest.{BeforeAndAfter, GivenWhenThen}
+
+import scala.collection.mutable
+
+
+class rasteralgebraTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen{
+
+  import sparkSession.implicits._
+
+  describe("should pass all the arithmetic operations on bands") {
+    it("Passed RS_AddBands") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(400.0, 900.0, 1400.0))).toDF("sumOfBands")
+      inputDf = inputDf.selectExpr("RS_AddBands(Band1,Band2) as sumOfBands")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+    it("Passed RS_SubtractBands") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(0.0, 100.0, 200.0))).toDF("differenceOfBands")
+      inputDf = inputDf.selectExpr("RS_SubtractBands(Band1,Band2) as differenceOfBands")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+    it("Passed RS_MultiplyBands") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(40000.0, 200000.0, 480000.0))).toDF("MultiplicationOfBands")
+      inputDf = inputDf.selectExpr("RS_MultiplyBands(Band1,Band2) as multiplicationOfBands")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+    it("Passed RS_DivideBands") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 200.0, 500.0)), ((Seq(0.4, 0.26, 0.27), Seq(0.3, 0.32, 0.43)))).toDF("Band1", "Band2")
+      val expectedList = List(List(1.0, 2.0, 1.2), List(1.33, 0.81, 0.63))
+      val inputList = inputDf.selectExpr("RS_DivideBands(Band1,Band2) as divisionOfBands").as[List[Double]].collect().toList
+      val resultList = inputList zip expectedList
+      for((actual, expected) <- resultList) {
+        assert(actual == expected)
+      }
+
+    }
+
+    it("Passed RS_MultiplyFactor") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0))).toDF("Band")
+      val expectedDF = Seq((Seq(600.0, 1200.0, 1800.0))).toDF("multiply")
+      inputDf = inputDf.selectExpr("RS_MultiplyFactor(Band, 3) as multiply")
+      expectedDF.show()
+      inputDf.show()
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+  }
+
+  describe("Should pass basic statistical tests") {
+    it("Passed RS_Mode") {
+      val inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0)), (Seq(200.0, 400.0, 600.0, 700.0))).toDF("Band")
+      val expectedResult = List(List(200.0), List(200.0, 400.0, 600.0, 700.0))
+      val actualResult = inputDf.selectExpr("RS_Mode(Band) as mode").as[List[Double]].collect().toList
+      val resultList = actualResult zip expectedResult
+      for((actual, expected) <- resultList) {
+        assert(actual == expected)
+      }
+
+    }
+
+    it("Passed RS_Mean") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0)), (Seq(200.0, 400.0, 600.0, 700.0)), (Seq(0.43, 0.36, 0.73, 0.56)) ).toDF("Band")
+      val expectedList = List(350.0,475.0,0.52)
+      val actualList = inputDf.selectExpr("RS_Mean(Band) as mean").as[Double].collect().toList
+      val resultList = actualList zip expectedList
+      for((actual, expected) <- resultList) {
+        assert(actual == expected)
+      }
+    }
+
+    it("Passed RS_NormalizedDifference") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0), Seq(200.0, 500.0, 800.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(0.0, 0.11, 0.14))).toDF("normalizedDifference")
+      inputDf = inputDf.selectExpr("RS_NormalizedDifference(Band1,Band2) as normalizedDifference")
+      expectedDF.show()
+      inputDf.show()
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_Count") {
+      var inputDf = Seq((Seq(200.0, 400.0, 600.0, 200.0, 600.0, 600.0, 800.0))).toDF("Band")
+      val expectedDF = Seq(3).toDF("Count")
+      inputDf = inputDf.selectExpr("RS_Count(Band, 600.0) as Count")
+      assert(inputDf.first().getAs[Int](0) == expectedDF.first().getAs[Int](0))
+    }
+  }
+
+  describe("Should pass operator tests") {
+    it("Passed RS_GreaterThan") {
+      var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+      val expectedDF = Seq((Seq(1.0,1.0,0.0,0.0,1.0,1.0,0.0)), (Seq(0.0,0.0,0.0,0.0,1.0,0.0))).toDF("GreaterThan")
+      inputDf = inputDf.selectExpr("RS_GreaterThan(Band, 0.2) as GreaterThan")
+      inputDf.show()
+      expectedDF.show()
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_GreaterThanEqual") {
+      var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+      val expectedDF = Seq((Seq(1.0,1.0,0.0,1.0,1.0,1.0,0.0)), (Seq(0.0,0.0,0.0,0.0,1.0,0.0))).toDF("GreaterThanEqual")
+      inputDf = inputDf.selectExpr("RS_GreaterThanEqual(Band, 0.2) as GreaterThanEqual")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_LessThan") {
+      var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+      val expectedDF = Seq((Seq(0.0,0.0,1.0,0.0,0.0,0.0,1.0)), (Seq(1.0,1.0,1.0,0.0,1.0))).toDF("LessThan")
+      inputDf = inputDf.selectExpr("RS_LessThan(Band, 0.2) as LessThan")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_LessThanEqual") {
+      var inputDf = Seq((Seq(0.42, 0.36, 0.18, 0.20, 0.21, 0.2001, 0.19)), (Seq(0.14, 0.13, 0.10, 0.86, 0.01))).toDF("Band")
+      val expectedDF = Seq((Seq(0.0,0.0,1.0,1.0,0.0,0.0,1.0)), (Seq(1.0,1.0,1.0,0.0,1.0))).toDF("LessThanEqual")
+      inputDf = inputDf.selectExpr("RS_LessThanEqual(Band, 0.2) as LessthanEqual")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_Modulo") {
+      var inputDf = Seq((Seq(100.0, 260.0, 189.0, 106.0, 230.0, 169.0, 196.0)), (Seq(230.0, 345.0, 136.0, 106.0, 134.0, 105.0))).toDF("Band")
+      val expectedDF = Seq((Seq(10.0, 80.0, 9.0, 16.0, 50.0, 79.0, 16.0)), (Seq(50.0, 75.0, 46.0, 16.0, 44.0, 15.0))).toDF("Modulo")
+      inputDf = inputDf.selectExpr("RS_Modulo(Band, 90.0) as Modulo")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+    it("Passed RS_BitwiseAND") {
+      var inputDf = Seq((Seq(10.0, 20.0, 30.0), Seq(10.0, 20.0, 30.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(10.0, 20.0, 30.0))).toDF("AND")
+      inputDf = inputDf.selectExpr("RS_BitwiseAND(Band1, Band2) as AND")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_BitwiseOR") {
+      var inputDf = Seq((Seq(10.0, 20.0, 30.0), Seq(40.0, 22.0, 62.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(42.0, 22.0, 62.0))).toDF("OR")
+      inputDf = inputDf.selectExpr("RS_BitwiseOR(Band1, Band2) as OR")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_SquareRoot") {
+      var inputDf = Seq((Seq(8.0, 16.0, 24.0))).toDF("Band")
+      val expectedDF = Seq((Seq(2.83, 4.0, 4.90))).toDF("SquareRoot")
+      inputDf = inputDf.selectExpr("RS_SquareRoot(Band) as SquareRoot")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+    it("Passed RS_LogicalDifference") {
+      var inputDf = Seq((Seq(10.0, 20.0, 30.0), Seq(40.0, 20.0, 50.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(10.0, 0.0, 30.0))).toDF("LogicalDifference")
+      inputDf = inputDf.selectExpr("RS_LogicalDifference(Band1, Band2) as LogicalDifference")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_LogicalOver") {
+      var inputDf = Seq((Seq(0.0, 0.0, 30.0), Seq(40.0, 20.0, 50.0))).toDF("Band1", "Band2")
+      val expectedDF = Seq((Seq(40.0, 20.0, 30.0))).toDF("LogicalOR")
+      inputDf = inputDf.selectExpr("RS_LogicalOver(Band1, Band2) as LogicalOR")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+
+    }
+
+    it("Passed RS_FetchRegion") {
+      var inputDf = Seq((Seq(100.0, 260.0, 189.0, 106.0, 230.0, 169.0, 196.0, 200.0, 460.0))).toDF("Band")
+      val expectedDF = Seq(Seq(100.0, 260.0, 189.0, 106.0, 230.0, 169.0)).toDF("Region")
+      inputDf = inputDf.selectExpr("RS_FetchRegion(Band,Array(0, 0, 1, 2),Array(3, 3)) as Region")
+      assert(inputDf.first().getAs[mutable.WrappedArray[Double]](0) == expectedDF.first().getAs[mutable.WrappedArray[Double]](0))
+    }
+
+    it("should pass RS_Normalize") {
+      var df = Seq((Seq(800.0, 900.0, 0.0, 255.0)), (Seq(100.0, 200.0, 700.0, 900.0))).toDF("Band")
+      df = df.selectExpr("RS_Normalize(Band) as normalizedBand")
+      assert(df.first().getAs[mutable.WrappedArray[Double]](0)(1) == 255)
+    }
+  }
+}