You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2023/05/14 21:13:39 UTC

[sedona] branch master updated: [SEDONA-269] Add Raster data source and RS_AsGeoTiff and RS_AsArcGrid (#828)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 43f8624e [SEDONA-269] Add Raster data source and RS_AsGeoTiff and RS_AsArcGrid (#828)
43f8624e is described below

commit 43f8624ec1ea15e652171832485fd640e19fc737
Author: Jia Yu <ji...@apache.org>
AuthorDate: Sun May 14 14:13:27 2023 -0700

    [SEDONA-269] Add Raster data source and RS_AsGeoTiff and RS_AsArcGrid (#828)
---
 .../org/apache/sedona/common/raster/Outputs.java   |  96 ++++++++++++
 .../raster_geotiff_color/FAA_UTM18N_NAD83.tif      | Bin 0 -> 2785537 bytes
 docs/api/sql/Raster-writer.md                      | 164 +++++++++++++++++++++
 .../scala/org/apache/sedona/sql/UDF/Catalog.scala  |   4 +-
 .../expressions/NullSafeExpressions.scala          |   2 +-
 .../sedona_sql/expressions/raster/Outputs.scala    |  78 ++++++++++
 .../spark/sql/sedona_sql/io/HadoopUtils.scala      | 107 --------------
 .../sedona_sql/io/{ => raster}/GeotiffSchema.scala |  90 ++++++-----
 .../io/{ => raster}/ImageReadOptions.scala         |   2 +-
 .../io/{ => raster}/ImageWriteOptions.scala        |   2 +-
 .../sedona_sql/io/raster/RasterFileFormat.scala    | 139 +++++++++++++++++
 .../RasterOptions.scala}                           |  20 +--
 .../org/apache/sedona/sql/rasteralgebraTest.scala  |  44 ++++++
 ...org.apache.spark.sql.sources.DataSourceRegister |   5 +-
 .../io/{ => raster}/GeotiffFileFormat.scala        |   5 +-
 .../scala/org/apache/sedona/sql/rasterIOTest.scala |  61 +++++++-
 ...org.apache.spark.sql.sources.DataSourceRegister |   5 +-
 .../io/{ => raster}/GeotiffFileFormat.scala        |   2 +-
 .../scala/org/apache/sedona/sql/rasterIOTest.scala |  61 +++++++-
 19 files changed, 696 insertions(+), 191 deletions(-)

diff --git a/common/src/main/java/org/apache/sedona/common/raster/Outputs.java b/common/src/main/java/org/apache/sedona/common/raster/Outputs.java
new file mode 100644
index 00000000..9d6f5bed
--- /dev/null
+++ b/common/src/main/java/org/apache/sedona/common/raster/Outputs.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sedona.common.raster;
+
+import org.geotools.coverage.grid.GridCoverage2D;
+import org.geotools.coverage.grid.io.AbstractGridFormat;
+import org.geotools.gce.arcgrid.ArcGridWriteParams;
+import org.geotools.gce.arcgrid.ArcGridWriter;
+import org.geotools.gce.geotiff.GeoTiffWriteParams;
+import org.geotools.gce.geotiff.GeoTiffWriter;
+import org.opengis.coverage.grid.GridCoverageWriter;
+import org.opengis.parameter.GeneralParameterValue;
+import org.opengis.parameter.ParameterValueGroup;
+
+import javax.imageio.ImageWriteParam;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class Outputs
+{
+    public static byte[] asGeoTiff(GridCoverage2D raster, String compressionType, float compressionQuality) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        GridCoverageWriter writer;
+        try {
+            writer = new GeoTiffWriter(out);
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        ParameterValueGroup defaultParams = writer.getFormat().getWriteParameters();
+        if (compressionType != null && compressionQuality >= 0 && compressionQuality <= 1) {
+            GeoTiffWriteParams params = new GeoTiffWriteParams();
+            params.setCompressionMode(ImageWriteParam.MODE_EXPLICIT);
+            // Available compression types: None, PackBits, Deflate, Huffman, LZW and JPEG
+            params.setCompressionType(compressionType);
+            // Should be a value between 0 and 1
+            // 0 means max compression, 1 means no compression
+            params.setCompressionQuality(compressionQuality);
+            defaultParams.parameter(AbstractGridFormat.GEOTOOLS_WRITE_PARAMS.getName().toString()).setValue(params);
+        }
+        GeneralParameterValue[] wps = defaultParams.values().toArray(new GeneralParameterValue[0]);
+        try {
+            writer.write(raster, wps);
+            writer.dispose();
+            out.close();
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return out.toByteArray();
+    }
+
+    public static byte[] asArcGrid(GridCoverage2D raster, int sourceBand) {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        GridCoverageWriter writer;
+        try {
+            writer = new ArcGridWriter(out);
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        ParameterValueGroup defaultParams = writer.getFormat().getWriteParameters();
+        if (sourceBand >= 0) {
+            ArcGridWriteParams params = new ArcGridWriteParams();
+            params.setSourceBands(new int[]{sourceBand});
+            defaultParams.parameter(AbstractGridFormat.GEOTOOLS_WRITE_PARAMS.getName().toString()).setValue(params);
+        }
+        GeneralParameterValue[] wps = defaultParams.values().toArray(new GeneralParameterValue[0]);
+        try {
+            writer.write(raster, wps);
+            writer.dispose();
+            out.close();
+        }
+        catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return out.toByteArray();
+    }
+}
diff --git a/core/src/test/resources/raster_geotiff_color/FAA_UTM18N_NAD83.tif b/core/src/test/resources/raster_geotiff_color/FAA_UTM18N_NAD83.tif
new file mode 100644
index 00000000..a0a38aa6
Binary files /dev/null and b/core/src/test/resources/raster_geotiff_color/FAA_UTM18N_NAD83.tif differ
diff --git a/docs/api/sql/Raster-writer.md b/docs/api/sql/Raster-writer.md
index a2663c31..eded6509 100644
--- a/docs/api/sql/Raster-writer.md
+++ b/docs/api/sql/Raster-writer.md
@@ -1,6 +1,170 @@
 !!!note
 	Sedona writers are available in Scala, Java and Python and have the same APIs.
 	
+## Write Raster DataFrame to raster files
+
+To write a Sedona Raster DataFrame to raster files, you need to (1) first convert the Raster DataFrame to a binary DataFrame using `RS_AsXXX` functions and (2) then write the binary DataFrame to raster files using Sedona's built-in `raster` data source.
+
+### Write raster DataFrame to a binary DataFrame
+
+You can use the following RS output functions (`RS_AsXXX`) to convert a Raster DataFrame to a binary DataFrame. Generally the output format of a raster can be different from the original input format. For example, you can use `RS_FromGeoTiff` to create rasters and save them using `RS_AsArcInfoAsciiGrid`.
+
+#### RS_AsGeoTiff
+
+Introduction: Returns a binary DataFrame from a Raster DataFrame. Each raster object in the resulting DataFrame is a GeoTiff image in binary format.
+
+Since: `v1.4.1`
+
+Format 1: `RS_AsGeoTiff(raster: Raster)`
+
+Format 2: `RS_AsGeoTiff(raster: Raster, compressionType:String, imageQuality:Integer/Decimal)`
+
+Possible values for `compressionType`: `None`, `PackBits`, `Deflate`, `Huffman`, `LZW` and `JPEG`
+
+Possible values for `imageQuality`: any decimal number between 0 and 1. 0 means the lowest quality and 1 means the highest quality.
+
+SQL example 1:
+
+```sql
+SELECT RS_AsGeoTiff(raster) FROM my_raster_table
+```
+
+SQL example 2:
+
+```sql
+SELECT RS_AsGeoTiff(raster, 'LZW', '0.75') FROM my_raster_table
+```
+
+Output:
+
+```html
++--------------------+
+|             geotiff|
++--------------------+
+|[4D 4D 00 2A 00 0...|
++--------------------+
+```
+
+Output schema:
+
+```sql
+root
+ |-- geotiff: binary (nullable = true)
+```
+
+#### RS_AsArcGrid
+
+Introduction: Returns a binary DataFrame from a Raster DataFrame. Each raster object in the resulting DataFrame is an ArcGrid image in binary format. ArcGrid only takes 1 source band. If your raster has multiple bands, you need to specify which band you want to use as the source.
+
+Since: `v1.4.1`
+
+Format 1: `RS_AsArcGrid(raster: Raster)`
+
+Format 2: `RS_AsArcGrid(raster: Raster, sourceBand:Integer)`
+
+Possible values for `sourceBand `: any non-negative value (>=0). If not given, it will use Band 0.
+
+SQL example 1:
+
+```sql
+SELECT RS_AsArcGrid(raster) FROM my_raster_table
+```
+
+SQL example 2:
+
+```sql
+SELECT RS_AsArcGrid(raster, 1) FROM my_raster_table
+```
+
+Output:
+
+```html
++--------------------+
+|             arcgrid|
++--------------------+
+|[4D 4D 00 2A 00 0...|
++--------------------+
+```
+
+Output schema:
+
+```sql
+root
+ |-- arcgrid: binary (nullable = true)
+```
+
+### Write a binary DataFrame to raster files
+
+Introduction: You can write a Sedona binary DataFrame to external storage using Sedona's built-in `raster` data source. Note that: `raster` data source does not support reading rasters. Please use Spark built-in `binaryFile` and Sedona RS constructors together to read rasters.
+
+Since: `v1.4.1`
+
+Available options:
+
+* rasterField:
+	* Default value: the `binary` type column in the DataFrame. If the input DataFrame has several binary columns, please specify which column you want to use.
+	* Allowed values: the name of the to-be-saved binary type column
+* fileExtension
+	* Default value: `.tiff`
+	* Allowed values: any string values such as `.png`, `.jpeg`, `.asc`
+* pathField
+	* No defaulut value. If you use this option, then the column specified in this option must exist in the DataFrame schema. If this option is not used, each produced raster image will have a random UUID file name.
+	* Allowed values: any column name that indicates the paths of each raster file
+
+The schema of the Raster dataframe to be written can be one of the following two schemas:
+
+```html
+root
+ |-- rs_asgeotiff(raster): binary (nullable = true)
+```
+
+or
+
+```html
+root
+ |-- rs_asgeotiff(raster): binary (nullable = true)
+ |-- path: string (nullable = true)
+```
+
+Spark SQL example 1:
+
+```scala
+sparkSession.write.format("raster").mode(SaveMode.Overwrite).save("my_raster_file")
+```
+
+Spark SQL example 2:
+
+```scala
+sparkSession.write.format("raster").option("rasterField", "raster").option("pathField", "path").option("fileExtension", ".tiff").mode(SaveMode.Overwrite).save("my_raster_file")
+```
+
+The produced file structure will look like this:
+
+```html
+my_raster_file
+- part-00000-6c7af016-c371-4564-886d-1690f3b27ca8-c000
+	- test1.tiff
+	- .test1.tiff.crc
+- part-00001-6c7af016-c371-4564-886d-1690f3b27ca8-c000
+	- test2.tiff
+	- .test2.tiff.crc
+- part-00002-6c7af016-c371-4564-886d-1690f3b27ca8-c000
+	- test3.tiff
+	- .test3.tiff.crc
+- _SUCCESS
+```
+
+To read it back to Sedona Raster DataFrame, you can use the following command (note the `*` in the path):
+
+```scala
+sparkSession.read.format("binaryFile").load("my_raster_file/*")
+```
+
+Then you can create Raster type in Sedona like this `RS_FromGeoTiff(content)` (if the written data was in GeoTiff format).
+
+The newly created DataFrame can be written to disk again but must be under a different name such as `my_raster_file_modified`
+
+
 ## Write Array[Double] to GeoTiff files
 
 Introduction: You can write a GeoTiff dataframe as GeoTiff images using the spark `write` feature with the format `geotiff`. The geotiff raster column needs to be an array of double type data.
diff --git a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
index 1c1347c1..9364eae5 100644
--- a/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
+++ b/sql/common/src/main/scala/org/apache/sedona/sql/UDF/Catalog.scala
@@ -176,7 +176,9 @@ object Catalog {
     function[RS_SetSRID](),
     function[RS_SRID](),
     function[RS_Value](1),
-    function[RS_Values](1)
+    function[RS_Values](1),
+    function[RS_AsGeoTiff](),
+    function[RS_AsArcGrid]()
   )
 
   val aggregateExpressions: Seq[Aggregator[Geometry, Geometry, Geometry]] = Seq(
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
index fc4fbb6e..f526baf0 100644
--- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/NullSafeExpressions.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
 import org.apache.spark.sql.catalyst.util.ArrayData
-import org.apache.spark.sql.sedona_sql.expressions.implicits._
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
+import org.apache.spark.sql.sedona_sql.expressions.implicits._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 import org.locationtech.jts.geom.Geometry
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Outputs.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Outputs.scala
new file mode 100644
index 00000000..632c9c46
--- /dev/null
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/expressions/raster/Outputs.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.spark.sql.sedona_sql.expressions.raster
+
+import org.apache.sedona.common.raster.Outputs
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
+import org.apache.spark.sql.sedona_sql.expressions.raster.implicits.RasterInputExpressionEnhancer
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+// Expected Types (RasterUDT, StringType, IntegerType) or (RasterUDT, StringType, DecimalType)
+case class RS_AsGeoTiff(inputExpressions: Seq[Expression]) extends Expression with CodegenFallback {
+
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = {
+    val raster = inputExpressions(0).toRaster(input)
+    if (raster == null) return null
+    // If there are more than one input expressions, the additional ones are used as parameters
+    if (inputExpressions.length > 1) {
+      Outputs.asGeoTiff(raster, inputExpressions(1).eval(input).asInstanceOf[UTF8String].toString, inputExpressions(2).eval(input).toString.toFloat)
+    }
+    else {
+      Outputs.asGeoTiff(raster, null, -1)
+    }
+  }
+
+  override def dataType: DataType = BinaryType
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
+    copy(inputExpressions = newChildren)
+  }
+}
+
+case class RS_AsArcGrid(inputExpressions: Seq[Expression]) extends Expression with CodegenFallback {
+
+  override def nullable: Boolean = true
+
+  override def eval(input: InternalRow): Any = {
+    val raster = inputExpressions(0).toRaster(input)
+    if (raster == null) return null
+    // If there are more than one input expressions, the additional ones are used as parameters
+    if (inputExpressions.length > 1) {
+      Outputs.asArcGrid(raster, inputExpressions(1).eval(input).asInstanceOf[Int])
+    }
+    else {
+      Outputs.asArcGrid(raster, -1)
+    }
+  }
+
+  override def dataType: DataType = BinaryType
+
+  override def children: Seq[Expression] = inputExpressions
+
+  protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]) = {
+    copy(inputExpressions = newChildren)
+  }
+}
\ No newline at end of file
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala
deleted file mode 100644
index 54c5377f..00000000
--- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/HadoopUtils.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.spark.sql.sedona_sql.io
-
-import org.apache.commons.io.FilenameUtils
-import org.apache.hadoop.conf.{Configuration, Configured}
-import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
-import org.apache.spark.sql.SparkSession
-
-import scala.language.existentials
-import scala.util.Random
-
-object RecursiveFlag {
-
-  /** Sets a value of spark recursive flag
-   *
-   * @param value value to set
-   * @param spark existing spark session
-   * @return previous value of this flag
-   */
-  def setRecursiveFlag(value: Option[String], spark: SparkSession): Option[String] = {
-    val flagName = FileInputFormat.INPUT_DIR_RECURSIVE
-    val hadoopConf = spark.sparkContext.hadoopConfiguration
-    val old = Option(hadoopConf.get(flagName))
-
-    value match {
-      case Some(v) => hadoopConf.set(flagName, v)
-      case None => hadoopConf.unset(flagName)
-    }
-
-    old
-  }
-}
-
-
-/** Filter that allows loading a fraction of HDFS files. */
-class SamplePathFilter extends Configured with PathFilter {
-  val random = {
-    val rd = new Random()
-    rd.setSeed(0)
-    rd
-  }
-
-  // Ratio of files to be read from disk
-  var sampleRatio: Double = 1
-
-  override def setConf(conf: Configuration): Unit = {
-    if (conf != null) {
-      sampleRatio = conf.getDouble(SamplePathFilter.ratioParam, 1)
-    }
-  }
-
-  override def accept(path: Path): Boolean = {
-    // Note: checking fileSystem.isDirectory is very slow here, so we use basic rules instead
-    !SamplePathFilter.isFile(path) ||
-      random.nextDouble() < sampleRatio
-  }
-}
-
-object SamplePathFilter {
-  val ratioParam = "sampleRatio"
-
-  def isFile(path: Path): Boolean = FilenameUtils.getExtension(path.toString) != ""
-
-  /** Set/unset  hdfs PathFilter
-   *
-   * @param value       Filter class that is passed to HDFS
-   * @param sampleRatio Fraction of the files that the filter picks
-   * @param spark       Existing Spark session
-   * @return
-   */
-  def setPathFilter(value: Option[Class[_]], sampleRatio: Option[Double] = None, spark: SparkSession)
-  : Option[Class[_]] = {
-    val flagName = FileInputFormat.PATHFILTER_CLASS
-    val hadoopConf = spark.sparkContext.hadoopConfiguration
-    val old = Option(hadoopConf.getClass(flagName, null))
-    if (sampleRatio.isDefined) {
-      hadoopConf.setDouble(SamplePathFilter.ratioParam, sampleRatio.get)
-    } else {
-      hadoopConf.unset(SamplePathFilter.ratioParam)
-      None
-    }
-
-    value match {
-      case Some(v) => hadoopConf.setClass(flagName, v, classOf[PathFilter])
-      case None => hadoopConf.unset(flagName)
-    }
-    old
-  }
-}
\ No newline at end of file
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffSchema.scala
similarity index 85%
rename from sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala
rename to sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffSchema.scala
index 5a3a3595..90c0ec55 100644
--- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffSchema.scala
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffSchema.scala
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.io
+package org.apache.spark.sql.sedona_sql.io.raster
 
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.sedona_sql.UDT.GeometryUDT
@@ -38,8 +38,8 @@ object GeotiffSchema {
   val undefinedImageType = "Undefined"
 
   /**
-   * Schema for the image column: Row(String,Geometry, Int, Int, Int, Array[Double])
-   */
+    * Schema for the image column: Row(String,Geometry, Int, Int, Int, Array[Double])
+    */
   val columnSchema = StructType(
     StructField("origin", StringType, true) ::
       StructField("geometry", StringType, true) ::
@@ -51,73 +51,72 @@ object GeotiffSchema {
   val imageFields: Array[String] = columnSchema.fieldNames
 
   /**
-   * DataFrame with a single column of images named "image" (nullable)
-   */
+    * DataFrame with a single column of images named "image" (nullable)
+    */
   val imageSchema = StructType(StructField("image", columnSchema, true) :: Nil)
 
   /**
-   * Gets the origin of the image
-   *
-   * @return The origin of the image
-   */
+    * Gets the origin of the image
+    *
+    * @return The origin of the image
+    */
   def getOrigin(row: Row): String = row.getString(0)
 
   /**
-   * Gets the origin of the image
-   *
-   * @return The origin of the image
-   */
+    * Gets the origin of the image
+    *
+    * @return The origin of the image
+    */
   def getGeometry(row: Row): GeometryUDT = row.getAs[GeometryUDT](1)
 
 
   /**
-   * Gets the height of the image
-   *
-   * @return The height of the image
-   */
+    * Gets the height of the image
+    *
+    * @return The height of the image
+    */
   def getHeight(row: Row): Int = row.getInt(2)
 
   /**
-   * Gets the width of the image
-   *
-   * @return The width of the image
-   */
+    * Gets the width of the image
+    *
+    * @return The width of the image
+    */
   def getWidth(row: Row): Int = row.getInt(3)
 
   /**
-   * Gets the number of channels in the image
-   *
-   * @return The number of bands in the image
-   */
+    * Gets the number of channels in the image
+    *
+    * @return The number of bands in the image
+    */
   def getNBands(row: Row): Int = row.getInt(4)
 
 
   /**
-   * Gets the image data
-   *
-   * @return The image data
-   */
+    * Gets the image data
+    *
+    * @return The image data
+    */
   def getData(row: Row): Array[Double] = row.getAs[Array[Double]](5)
 
   /**
-   * Default values for the invalid image
-   *
-   * @param origin Origin of the invalid image
-   * @return Row with the default values
-   */
+    * Default values for the invalid image
+    *
+    * @param origin Origin of the invalid image
+    * @return Row with the default values
+    */
   private[io] def invalidImageRow(origin: String): Row =
     Row(Row(origin, -1, -1, -1, Array.ofDim[Byte](0)))
 
   /**
-   *
-   * Convert a GeoTiff image into a dataframe row
-   *
-   *
-   * @param origin Arbitrary string that identifies the image
-   * @param bytes  Image bytes (for example, jpeg)
-   * @return DataFrame Row or None (if the decompression fails)
-   *
-   */
+    *
+    * Convert a GeoTiff image into a dataframe row
+    *
+    * @param origin Arbitrary string that identifies the image
+    * @param bytes  Image bytes (for example, jpeg)
+    * @return DataFrame Row or None (if the decompression fails)
+    *
+    */
 
   private[io] def decode(origin: String, bytes: Array[Byte], imageSourceOptions: ImageReadOptions): Option[Row] = {
 
@@ -215,8 +214,3 @@ object GeotiffSchema {
     Some(Row(Row(origin, polygon.toText, height, width, nBands, decoded)))
   }
 }
-
-
-
-
-
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageReadOptions.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageReadOptions.scala
similarity index 97%
rename from sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageReadOptions.scala
rename to sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageReadOptions.scala
index f73fc7cf..552b8f8e 100644
--- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageReadOptions.scala
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageReadOptions.scala
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.io
+package org.apache.spark.sql.sedona_sql.io.raster
 
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageWriteOptions.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageWriteOptions.scala
similarity index 96%
copy from sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageWriteOptions.scala
copy to sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageWriteOptions.scala
index 8653c93a..6a730faa 100644
--- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageWriteOptions.scala
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/ImageWriteOptions.scala
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.io
+package org.apache.spark.sql.sedona_sql.io.raster
 
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
new file mode 100644
index 00000000..abf11c9e
--- /dev/null
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterFileFormat.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.spark.sql.sedona_sql.io.raster
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.StructType
+
+import java.io.IOException
+import java.nio.file.Paths
+import java.util.UUID
+
+private[spark] class RasterFileFormat extends FileFormat with DataSourceRegister {
+
+  override def inferSchema(
+                            sparkSession: SparkSession,
+                            options: Map[String, String],
+                            files: Seq[FileStatus]): Option[StructType] = {
+    throw new UnsupportedOperationException("Please use 'binaryFile' data source to reading raster files")
+    None
+  }
+
+  override def prepareWrite(
+                             sparkSession: SparkSession,
+                             job: Job,
+                             options: Map[String, String],
+                             dataSchema: StructType): OutputWriterFactory = {
+    val rasterOptions = new RasterOptions(options)
+    if (!isValidRasterSchema(dataSchema)) {
+      throw new IllegalArgumentException("Invalid Raster DataFrame Schema")
+    }
+
+    new OutputWriterFactory {
+      override def getFileExtension(context: TaskAttemptContext): String = ""
+
+      override def newInstance(path: String, dataSchema: StructType, context: TaskAttemptContext): OutputWriter = {
+        new RasterFileWriter(path, rasterOptions, dataSchema, context)
+      }
+    }
+  }
+
+  override def shortName(): String = "raster"
+
+  private def isValidRasterSchema(dataSchema: StructType): Boolean = {
+    var imageColExist: Boolean = false
+    val fields = dataSchema.fields
+    fields.foreach(field => {
+      if (field.dataType.typeName.equals("binary")) {
+        imageColExist = true
+      }
+    })
+    imageColExist
+  }
+
+}
+
+// class for writing raster images
+private class RasterFileWriter(savePath: String,
+                               rasterOptions: RasterOptions,
+                                dataSchema: StructType,
+                                context: TaskAttemptContext) extends OutputWriter {
+
+  private val hfs = new Path(savePath).getFileSystem(context.getConfiguration)
+  private val rasterFieldIndex = if (rasterOptions.rasterField.isEmpty) getRasterFieldIndex else dataSchema.fieldIndex(rasterOptions.rasterField.get)
+
+  private def getRasterFieldIndex: Int = {
+    val schemaFields: StructType = dataSchema
+    var curField = -1
+    for (i <- schemaFields.indices) {
+      if (schemaFields.fields(i).dataType.typeName.equals("binary")) {
+        curField = i
+      }
+    }
+    curField
+  }
+  override def write(row: InternalRow): Unit = {
+    // Get grid coverage 2D from the row
+    val rasterRaw = row.getBinary(rasterFieldIndex)
+    // If the raster is null, return
+    if (rasterRaw == null) return
+    // If the raster is not null, write it to disk
+    val rasterFilePath = getRasterFilePath(row, dataSchema, rasterOptions)
+    // write the image to file
+    try {
+      val out = hfs.create(new Path(Paths.get(savePath, new Path(rasterFilePath).getName).toString))
+      out.write(rasterRaw)
+      out.close()
+    } catch {
+      case e@(_: IOException) =>
+        // TODO Auto-generated catch block
+        e.printStackTrace()
+    }
+  }
+
+  override def close(): Unit = {
+    hfs.close()
+  }
+
+  def path(): String = {
+    savePath
+  }
+
+  private def getRasterFilePath(row: InternalRow, schema: StructType, rasterOptions: RasterOptions): String = {
+    // If the output path is not provided, generate a random UUID as the file name
+    var rasterFilePath = UUID.randomUUID().toString
+    if (rasterOptions.rasterPathField.isDefined) {
+      val rasterFilePathRaw = row.getString(schema.fieldIndex(rasterOptions.rasterPathField.get))
+      // If the output path field is provided, but the value is null, generate a random UUID as the file name
+      if (rasterFilePathRaw != null) {
+        // remove the extension if exists
+        if (rasterFilePathRaw.contains(".")) rasterFilePath = rasterFilePathRaw.substring(0, rasterFilePathRaw.lastIndexOf("."))
+        else rasterFilePath = rasterFilePathRaw
+      }
+    }
+    rasterFilePath + rasterOptions.fileExtension
+  }
+}
diff --git a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageWriteOptions.scala b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
similarity index 57%
rename from sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageWriteOptions.scala
rename to sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
index 8653c93a..a518c0a3 100644
--- a/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/ImageWriteOptions.scala
+++ b/sql/common/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/RasterOptions.scala
@@ -16,21 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.spark.sql.sedona_sql.io
+package org.apache.spark.sql.sedona_sql.io.raster
 
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 
-private[io] class ImageWriteOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {
+private[io] class RasterOptions(@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {
   def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))
 
-  // Optional parameters for writing GeoTiff
-  val writeToCRS = parameters.getOrElse("writeToCRS", "EPSG:4326")
-  val colImage = parameters.getOrElse("fieldImage", "image")
-  val colOrigin = parameters.getOrElse("fieldOrigin", "origin")
-  val colBands = parameters.getOrElse("fieldNBands", "nBands")
-  val colWidth = parameters.getOrElse("fieldWidth", "width")
-  val colHeight = parameters.getOrElse("fieldHeight", "height")
-  val colGeometry = parameters.getOrElse("fieldGeometry", "geometry")
-  val colData = parameters.getOrElse("fieldData", "data")
-
+  // The file format of the raster image
+  val fileExtension = parameters.getOrElse("fileExtension", ".tiff")
+  // Column of the raster image name
+  val rasterPathField = parameters.get("pathField")
+  // Column of the raster image itself
+  val rasterField = parameters.get("rasterField")
 }
\ No newline at end of file
diff --git a/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala b/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
index f3854295..bc189785 100644
--- a/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
+++ b/sql/common/src/test/scala/org/apache/sedona/sql/rasteralgebraTest.scala
@@ -20,6 +20,7 @@ package org.apache.sedona.sql
 
 import org.apache.spark.sql.functions.{collect_list, expr}
 import org.geotools.coverage.grid.GridCoverage2D
+import org.junit.Assert.assertEquals
 import org.locationtech.jts.geom.Geometry
 import org.scalatest.{BeforeAndAfter, GivenWhenThen}
 
@@ -339,5 +340,48 @@ class rasteralgebraTest extends TestBaseScala with BeforeAndAfter with GivenWhen
       assert(result.get(0) == 255d)
       assert(result.get(1) == null)
     }
+
+    it("Passed RS_AsGeoTiff") {
+      val df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster/*")
+      val resultRaw = df.selectExpr("RS_FromGeoTiff(content) as raster").first().get(0)
+      val resultLoaded = df.selectExpr("RS_FromGeoTiff(content) as raster")
+        .selectExpr("RS_AsGeoTiff(raster) as geotiff")
+        .selectExpr("RS_FromGeoTiff(geotiff) as raster_new").first().get(0)
+      assert(resultLoaded != null)
+      assert(resultLoaded.isInstanceOf[GridCoverage2D])
+      assertEquals(resultRaw.asInstanceOf[GridCoverage2D].getEnvelope.toString, resultLoaded.asInstanceOf[GridCoverage2D].getEnvelope.toString)
+    }
+
+    it("Passed RS_AsGeoTiff with different compression types") {
+      val df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster/test1.tiff")
+      val resultRaw = df.selectExpr("RS_FromGeoTiff(content) as raster").first().get(0)
+      val resultLoadedDf = df.selectExpr("RS_FromGeoTiff(content) as raster")
+        .withColumn("geotiff", expr("RS_AsGeoTiff(raster, 'LZW', 1)"))
+        .withColumn("geotiff2", expr("RS_AsGeoTiff(raster, 'Deflate', 0.5)"))
+        .withColumn("raster_new", expr("RS_FromGeoTiff(geotiff)"))
+      val resultLoaded = resultLoadedDf.first().getAs[GridCoverage2D]("raster_new")
+      val writtenBinary1 = resultLoadedDf.first().getAs[Array[Byte]]("geotiff")
+      val writtenBinary2 = resultLoadedDf.first().getAs[Array[Byte]]("geotiff2")
+      assertEquals(resultRaw.asInstanceOf[GridCoverage2D].getEnvelope.toString, resultLoaded.getEnvelope.toString)
+      assert(writtenBinary1.length > writtenBinary2.length)
+    }
+
+    it("Passed RS_AsArcGrid") {
+      val df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster_asc/*")
+      val resultRaw = df.selectExpr("RS_FromArcInfoAsciiGrid(content) as raster").first().get(0)
+      val resultLoaded = df.selectExpr("RS_FromArcInfoAsciiGrid(content) as raster")
+        .selectExpr("RS_AsArcGrid(raster) as arcgrid")
+        .selectExpr("RS_FromArcInfoAsciiGrid(arcgrid) as raster_new").first().get(0)
+      assert(resultLoaded != null)
+      assert(resultLoaded.isInstanceOf[GridCoverage2D])
+      assertEquals(resultRaw.asInstanceOf[GridCoverage2D].getEnvelope.toString, resultLoaded.asInstanceOf[GridCoverage2D].getEnvelope.toString)
+    }
+
+    it("Passed RS_AsArcGrid with different bands") {
+      val df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster_geotiff_color/*").selectExpr("RS_FromGeoTiff(content) as raster")
+      val rasterDf = df.selectExpr("RS_AsArcGrid(raster, 0) as arc", "RS_AsArcGrid(raster, 1) as arc2")
+      val binaryDf = rasterDf.selectExpr("RS_FromArcInfoAsciiGrid(arc) as raster", "RS_FromArcInfoAsciiGrid(arc2) as raster2")
+      assertEquals(rasterDf.count(), binaryDf.count())
+    }
   }
 }
diff --git a/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 68ea723a..4352e818 100644
--- a/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/spark-3.0/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,2 +1,3 @@
-org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat
-org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.sedona_sql.io.raster.GeotiffFileFormat
+org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
+org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
\ No newline at end of file
diff --git a/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala b/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
similarity index 99%
rename from sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
rename to sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
index 842e28f3..f3360ae3 100644
--- a/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
+++ b/sql/spark-3.0/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
@@ -18,21 +18,20 @@
  */
 
 
-package org.apache.spark.sql.sedona_sql.io
+package org.apache.spark.sql.sedona_sql.io.raster
 
 import com.google.common.io.{ByteStreams, Closeables}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
 import org.apache.sedona.sql.utils.GeometrySerializer
-import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory, PartitionedFile}
 import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.SerializableConfiguration
 import org.geotools.coverage.CoverageFactoryFinder
diff --git a/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala b/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
index 7206ac38..8a6b56ec 100644
--- a/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ b/sql/spark-3.0/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -19,15 +19,19 @@
 
 package org.apache.sedona.sql
 
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SaveMode
 import org.locationtech.jts.geom.Geometry
 import org.scalatest.{BeforeAndAfter, GivenWhenThen}
 
 import java.io.File
+import java.nio.file.Files
 import scala.collection.mutable
 
 class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen {
 
   var rasterdatalocation: String = resourceFolder + "raster/"
+  val tempDir: String = Files.createTempDirectory("sedona_raster_io_test_").toFile.getAbsolutePath
 
   describe("Raster IO test") {
     it("Should Pass geotiff loading without readFromCRS and readToCRS") {
@@ -158,7 +162,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
     it("Should Pass geotiff file writing with coalesce") {
       var df = sparkSession.read.format("geotiff").option("dropInvalid", true).option("readToCRS", "EPSG:4326").load(rasterdatalocation)
       df = df.selectExpr("image.origin as origin","image.geometry as geometry", "image.height as height", "image.width as width", "image.data as data", "image.nBands as nBands")
-      val savePath = resourceFolder + "raster-written/"
+      val savePath = tempDir + "/raster-written/"
       df.coalesce(1).write.mode("overwrite").format("geotiff").save(savePath)
 
       var loadPath = savePath
@@ -185,7 +189,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
     it("Should Pass geotiff file writing with writeToCRS") {
       var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(rasterdatalocation)
       df = df.selectExpr("image.origin as origin","image.geometry as geometry", "image.height as height", "image.width as width", "image.data as data", "image.nBands as nBands")
-      val savePath = resourceFolder + "raster-written/"
+      val savePath = tempDir + "/raster-written/"
       df.coalesce(1).write.mode("overwrite").format("geotiff").option("writeToCRS", "EPSG:4499").save(savePath)
 
       var loadPath = savePath
@@ -212,7 +216,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
     it("Should Pass geotiff file writing without coalesce") {
       var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(rasterdatalocation)
       df = df.selectExpr("image.origin as origin","image.geometry as geometry", "image.height as height", "image.width as width", "image.data as data", "image.nBands as nBands")
-      val savePath = resourceFolder + "raster-written/"
+      val savePath = tempDir + "/raster-written/"
       df.write.mode("overwrite").format("geotiff").save(savePath)
 
       var imageCount = 0
@@ -347,11 +351,56 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
         }
       }
     }
-    
-  }
-}
 
+    it("should read geotiff using binary source and write geotiff back to disk using raster source") {
+      var rasterDf = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
 
+    it("should read and write geotiff using given options") {
+      var rasterDf = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", "content").option("fileExtension", ".tiff").option("pathField", "path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
 
+    it("should read and write via RS_FromGeoTiff and RS_AsGeoTiff") {
+      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", "path").selectExpr("RS_AsGeoTiff(raster) as content", "path")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", "content").option("fileExtension", ".tiff").option("pathField", "path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+
+    it("should handle null") {
+      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(null) as raster", "length").selectExpr("RS_AsGeoTiff(raster) as content", "length")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterCount == 3)
+      assert(rasterDf.count() == 0)
+    }
 
+    it("should read RS_FromGeoTiff and write RS_AsArcGrid") {
+      var df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster_geotiff_color/*")
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", "path").selectExpr("RS_AsArcGrid(raster, 1) as content", "path")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", "content").option("fileExtension", ".asc").option("pathField", "path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromArcInfoAsciiGrid(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+  }
 
+  override def afterAll(): Unit = FileUtils.deleteDirectory(new File(tempDir))
+}
\ No newline at end of file
diff --git a/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 68ea723a..4352e818 100644
--- a/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/sql/spark-3.4/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1,2 +1,3 @@
-org.apache.spark.sql.sedona_sql.io.GeotiffFileFormat
-org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
\ No newline at end of file
+org.apache.spark.sql.sedona_sql.io.raster.GeotiffFileFormat
+org.apache.spark.sql.execution.datasources.parquet.GeoParquetFileFormat
+org.apache.spark.sql.sedona_sql.io.raster.RasterFileFormat
\ No newline at end of file
diff --git a/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala b/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
similarity index 99%
rename from sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
rename to sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
index 7a1a99ed..318da263 100644
--- a/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/GeotiffFileFormat.scala
+++ b/sql/spark-3.4/src/main/scala/org/apache/spark/sql/sedona_sql/io/raster/GeotiffFileFormat.scala
@@ -18,7 +18,7 @@
  */
 
 
-package org.apache.spark.sql.sedona_sql.io
+package org.apache.spark.sql.sedona_sql.io.raster
 
 import com.google.common.io.{ByteStreams, Closeables}
 import org.apache.hadoop.conf.Configuration
diff --git a/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala b/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
index 7206ac38..8a6b56ec 100644
--- a/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
+++ b/sql/spark-3.4/src/test/scala/org/apache/sedona/sql/rasterIOTest.scala
@@ -19,15 +19,19 @@
 
 package org.apache.sedona.sql
 
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.SaveMode
 import org.locationtech.jts.geom.Geometry
 import org.scalatest.{BeforeAndAfter, GivenWhenThen}
 
 import java.io.File
+import java.nio.file.Files
 import scala.collection.mutable
 
 class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen {
 
   var rasterdatalocation: String = resourceFolder + "raster/"
+  val tempDir: String = Files.createTempDirectory("sedona_raster_io_test_").toFile.getAbsolutePath
 
   describe("Raster IO test") {
     it("Should Pass geotiff loading without readFromCRS and readToCRS") {
@@ -158,7 +162,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
     it("Should Pass geotiff file writing with coalesce") {
       var df = sparkSession.read.format("geotiff").option("dropInvalid", true).option("readToCRS", "EPSG:4326").load(rasterdatalocation)
       df = df.selectExpr("image.origin as origin","image.geometry as geometry", "image.height as height", "image.width as width", "image.data as data", "image.nBands as nBands")
-      val savePath = resourceFolder + "raster-written/"
+      val savePath = tempDir + "/raster-written/"
       df.coalesce(1).write.mode("overwrite").format("geotiff").save(savePath)
 
       var loadPath = savePath
@@ -185,7 +189,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
     it("Should Pass geotiff file writing with writeToCRS") {
       var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(rasterdatalocation)
       df = df.selectExpr("image.origin as origin","image.geometry as geometry", "image.height as height", "image.width as width", "image.data as data", "image.nBands as nBands")
-      val savePath = resourceFolder + "raster-written/"
+      val savePath = tempDir + "/raster-written/"
       df.coalesce(1).write.mode("overwrite").format("geotiff").option("writeToCRS", "EPSG:4499").save(savePath)
 
       var loadPath = savePath
@@ -212,7 +216,7 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
     it("Should Pass geotiff file writing without coalesce") {
       var df = sparkSession.read.format("geotiff").option("dropInvalid", true).load(rasterdatalocation)
       df = df.selectExpr("image.origin as origin","image.geometry as geometry", "image.height as height", "image.width as width", "image.data as data", "image.nBands as nBands")
-      val savePath = resourceFolder + "raster-written/"
+      val savePath = tempDir + "/raster-written/"
       df.write.mode("overwrite").format("geotiff").save(savePath)
 
       var imageCount = 0
@@ -347,11 +351,56 @@ class rasterIOTest extends TestBaseScala with BeforeAndAfter with GivenWhenThen
         }
       }
     }
-    
-  }
-}
 
+    it("should read geotiff using binary source and write geotiff back to disk using raster source") {
+      var rasterDf = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
 
+    it("should read and write geotiff using given options") {
+      var rasterDf = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", "content").option("fileExtension", ".tiff").option("pathField", "path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      rasterDf = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = rasterDf.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
 
+    it("should read and write via RS_FromGeoTiff and RS_AsGeoTiff") {
+      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", "path").selectExpr("RS_AsGeoTiff(raster) as content", "path")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", "content").option("fileExtension", ".tiff").option("pathField", "path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+
+    it("should handle null") {
+      var df = sparkSession.read.format("binaryFile").load(rasterdatalocation)
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(null) as raster", "length").selectExpr("RS_AsGeoTiff(raster) as content", "length")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromGeoTiff(content)")
+      assert(rasterCount == 3)
+      assert(rasterDf.count() == 0)
+    }
 
+    it("should read RS_FromGeoTiff and write RS_AsArcGrid") {
+      var df = sparkSession.read.format("binaryFile").load(resourceFolder + "raster_geotiff_color/*")
+      var rasterDf = df.selectExpr("RS_FromGeoTiff(content) as raster", "path").selectExpr("RS_AsArcGrid(raster, 1) as content", "path")
+      val rasterCount = rasterDf.count()
+      rasterDf.write.format("raster").option("rasterField", "content").option("fileExtension", ".asc").option("pathField", "path").mode(SaveMode.Overwrite).save(tempDir + "/raster-written")
+      df = sparkSession.read.format("binaryFile").load(tempDir + "/raster-written/*")
+      rasterDf = df.selectExpr("RS_FromArcInfoAsciiGrid(content)")
+      assert(rasterDf.count() == rasterCount)
+    }
+  }
 
+  override def afterAll(): Unit = FileUtils.deleteDirectory(new File(tempDir))
+}
\ No newline at end of file