You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sedona.apache.org by ji...@apache.org on 2023/03/10 19:13:23 UTC

[sedona] branch master updated: [SEDONA-258] Fix deserialization of Circle objects in python-adapter (#793)

This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/sedona.git


The following commit(s) were added to refs/heads/master by this push:
     new 2664cd79 [SEDONA-258] Fix deserialization of Circle objects in python-adapter (#793)
2664cd79 is described below

commit 2664cd792040ee7f56e28d34665ae2b7dc74b4eb
Author: Kristin Cowalcijk <mo...@yeah.net>
AuthorDate: Sat Mar 11 03:13:16 2023 +0800

    [SEDONA-258] Fix deserialization of Circle objects in python-adapter (#793)
---
 .../translation/GeometryRddConverter.scala         |  3 +-
 .../translation/PythonGeometrySerializer.scala     | 27 +++++++------
 .../translation/PythonRDDToJavaConverter.scala     | 46 +++++++++++-----------
 .../sedona/python/wrapper/GeometrySample.scala     |  3 ++
 .../python/wrapper/TestToPythonSerialization.scala | 22 ++++++++---
 python/tests/test_assign_raw_spatial_rdd.py        | 22 ++++++++++-
 6 files changed, 80 insertions(+), 43 deletions(-)

diff --git a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometryRddConverter.scala b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometryRddConverter.scala
index 67dffda1..1071cf3a 100644
--- a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometryRddConverter.scala
+++ b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/GeometryRddConverter.scala
@@ -31,8 +31,7 @@ private[python] case class GeometryRddConverter[T <: Geometry](spatialRDD: JavaR
       val typeBuffer = 0.toByteArray()
       val sizeBuffer = 0.toByteArray()
       typeBuffer ++ geometrySerializer.serialize(geom) ++ sizeBuffer
-    }
-    ).toJavaRDD()
+    }).toJavaRDD()
   }
 
 }
diff --git a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonGeometrySerializer.scala b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonGeometrySerializer.scala
index 6af6bcb9..722923fd 100644
--- a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonGeometrySerializer.scala
+++ b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonGeometrySerializer.scala
@@ -36,26 +36,29 @@ private[python] class PythonGeometrySerializer extends Serializable {
       - Translate user attributes using UTF-8 encoding
    */
 
-  def serialize: (Geometry => Array[Byte]) = {
-    case geometry: Circle => CircleSerializer(geometry).serialize
-    case geometry: Geometry => GeometrySerializer(geometry).serialize
-
+  def serialize(geometry: Geometry): Array[Byte] = {
+    geometry match {
+      case circle: Circle => CircleSerializer(circle).serialize
+      case _ => GeometrySerializer(geometry).serialize
+    }
   }
 
-  def deserialize: Array[Byte] => Geometry = (values: Array[Byte]) => {
-    val reader = new WKBReader()
+  def deserialize(values: Array[Byte]): Geometry = {
     val isCircle = values.head.toInt
-    val valuesLength = values.length
+    deserialize(isCircle, values, 1)
+  }
 
+  def deserialize(isCircle: Int, values: Array[Byte], offset: Int): Geometry = {
+    val reader = new WKBReader()
     if (isCircle == 1) {
-      val geom = reader.read(values.slice(9, valuesLength))
-      val radius = ByteBuffer.wrap(values.slice(1, 9)).getDouble()
+      val geom = reader.read(values.slice(offset + 8, values.length))
+      val radius = ByteBuffer.wrap(values.slice(offset, offset + 8)).getDouble()
       new Circle(geom, radius)
     }
     else if (isCircle == 0) {
-      reader.read(values.slice(1, valuesLength))
+      reader.read(values.slice(offset, values.length))
+    } else {
+      throw SerializationException("Can not deserialize object")
     }
-    else throw SerializationException("Can not deserialize object")
-
   }
 }
diff --git a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonRDDToJavaConverter.scala b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonRDDToJavaConverter.scala
index 43f7d6b3..3dd21157 100644
--- a/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonRDDToJavaConverter.scala
+++ b/python-adapter/src/main/scala/org.apache.sedona.python.wrapper/translation/PythonRDDToJavaConverter.scala
@@ -20,47 +20,47 @@
 package org.apache.sedona.python.wrapper.translation
 
 import java.io.{ByteArrayInputStream, DataInputStream}
-
 import org.apache.sedona.common.geometryObjects.Circle
 import org.apache.sedona.python.wrapper.SerializationException
 import org.apache.spark.api.java.JavaRDD
 import org.locationtech.jts.geom.Geometry
 
+import java.nio.{ByteBuffer, ByteOrder}
+
 case class PythonRDDToJavaConverter(javaRDD: JavaRDD[Array[Byte]], geometrySerializer: PythonGeometrySerializer) {
   def translateToJava: JavaRDD[Geometry] = {
     javaRDD.rdd.map[Geometry](serializedGeoData => {
       val geoDataBytes = new ByteArrayInputStream(serializedGeoData)
       val geoDataInputBytes = new DataInputStream(geoDataBytes)
       val rddType = geoDataInputBytes.readInt()
-      val isCircle = geoDataInputBytes.readByte().toInt
-      if (isCircle == 1) {
-        val radius = geoDataInputBytes.readDouble()
-        val geometry = readGeometry(serializedGeoData, geoDataInputBytes, 20)
-        new Circle(geometry, radius)
-      }
-      if (isCircle == 0) {
-        readGeometry(serializedGeoData, geoDataInputBytes, 12)
-      }
-      else throw SerializationException()
-
-
-    }
-
-    )
+      readGeometry(serializedGeoData, geoDataInputBytes)
+    })
   }
 
-  private def readGeometry(serializedGeoData: Array[Byte], inputStream: DataInputStream, skipBytes: Int): Geometry = {
+  private def readGeometry(serializedGeoData: Array[Byte], inputStream: DataInputStream): Geometry = {
+    val isCircle = inputStream.readByte().toInt
     val geomDataLength = java.lang.Integer.reverseBytes(inputStream.readInt())
     val userDataLength = java.lang.Integer.reverseBytes(inputStream.readInt())
+    val skipBytes = 13  // 4 bytes for rddType, 1 byte for isCircle, 4 bytes for geomDataLength, 4 bytes for userDataLength
 
-    val toReadGeometry = serializedGeoData.slice(skipBytes, skipBytes + geomDataLength + 1)
-    val geom = geometrySerializer.deserialize(toReadGeometry)
-
-    val userData = serializedGeoData.slice(skipBytes + geomDataLength + 1, skipBytes + geomDataLength + 1 + userDataLength)
+    // Circle will be handled specially in the next step. We don't let geometrySerializer.deserialize handle Circle
+    // for us since this serialization format is a little bit different from the one used in PythonGeometrySerializer.
+    val geom = geometrySerializer.deserialize(0, serializedGeoData, skipBytes)
+    val userData = serializedGeoData.slice(skipBytes + geomDataLength, skipBytes + geomDataLength + userDataLength)
       .map(_.toChar).mkString
 
-    geom.setUserData(userData)
-    geom
+    val finalGeom = if (isCircle == 1) {
+      val radiusOffset = skipBytes + geomDataLength + userDataLength
+      val radiusBytes = serializedGeoData.slice(radiusOffset, radiusOffset + 8)
+      val radius = ByteBuffer.wrap(radiusBytes).order(ByteOrder.LITTLE_ENDIAN).getDouble
+      new Circle(geom, radius)
+    } else if (isCircle == 0) {
+      geom
+    } else {
+      throw SerializationException("Can not deserialize object")
+    }
+    finalGeom.setUserData(userData)
+    finalGeom
   }
 }
 
diff --git a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
index ea5078eb..8e90a444 100644
--- a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
+++ b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/GeometrySample.scala
@@ -19,6 +19,7 @@
 
 package org.apache.sedona.python.wrapper
 
+import org.apache.sedona.common.geometryObjects.Circle
 import org.locationtech.jts.geom.Geometry
 
 import java.io.{FileInputStream, InputStream}
@@ -31,6 +32,8 @@ trait GeometrySample extends PythonTestSpec {
   private[python] val samplePoints: List[Geometry] =
     loadGeometriesFromResources(resourceFolder + "python/samplePoints")
 
+  private[python] val sampleCircles: List[Geometry] = samplePoints.map(new Circle(_, 1.0))
+
   private[python] val sampleLines: List[Geometry] =
     loadGeometriesFromResources(resourceFolder + "python/sampleLines")
 
diff --git a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
index ab619852..b390acfe 100644
--- a/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
+++ b/python-adapter/src/test/scala/org/apache/sedona/python/wrapper/TestToPythonSerialization.scala
@@ -19,10 +19,12 @@
 
 package org.apache.sedona.python.wrapper
 
-import org.apache.sedona.python.wrapper.translation.{FlatPairRddConverter, GeometryRddConverter, ListPairRddConverter}
+import org.apache.sedona.python.wrapper.adapters.PythonConverter
+import org.apache.sedona.python.wrapper.translation.{FlatPairRddConverter, GeometryRddConverter, ListPairRddConverter, PythonRDDToJavaConverter}
 import org.apache.spark.api.java.JavaPairRDD
 import org.scalatest.Matchers
 import org.apache.sedona.python.wrapper.utils.implicits._
+
 import scala.jdk.CollectionConverters._
 
 
@@ -34,7 +36,18 @@ class TestToPythonSerialization extends SparkUtil with GeometrySample with Match
       case a: Array[Byte] => a.toList
     })
     convertedToPythonArrays should contain theSameElementsAs expectedPointArray
+  }
+
+  test("Test between Circle RDD and Python RDD") {
+    val rddPython = PythonConverter.translateSpatialRDDToPython(circleSpatialRDD)
+    val rddJava = PythonConverter.translatePythonRDDToJava(rddPython)
+    rddJava.collect() should contain theSameElementsAs sampleCircles
+  }
 
+  test("Test between Point RDD and Python RDD") {
+    val rddPython = PythonConverter.translateSpatialRDDToPython(pointSpatialRDD)
+    val rddJava = PythonConverter.translatePythonRDDToJava(rddPython)
+    rddJava.collect() should contain theSameElementsAs samplePoints
   }
 
   test("Should serialize to Python JavaRDD[Geometry, Geometry]") {
@@ -55,9 +68,9 @@ class TestToPythonSerialization extends SparkUtil with GeometrySample with Match
     existingValues should contain theSameElementsAs expectedPairRDDWithListPythonArray
   }
 
-  private val pointSpatialRDD = sc.parallelize(
-    samplePoints
-  ).toJavaRDD()
+  private val pointSpatialRDD = sc.parallelize(samplePoints).toJavaRDD()
+
+  private val circleSpatialRDD = sc.parallelize(sampleCircles).toJavaRDD()
 
   private val spatialPairRDD = sc.parallelize(
     samplePoints.zip(samplePolygons).map(
@@ -71,7 +84,6 @@ class TestToPythonSerialization extends SparkUtil with GeometrySample with Match
     )
   )
 
-
   private val expectedPointArray: List[List[Byte]] = samplePoints.map(point =>
     0.toByteArray().toList ++ pythonGeometrySerializer.serialize(point).toList ++ 0.toByteArray().toList)
 
diff --git a/python/tests/test_assign_raw_spatial_rdd.py b/python/tests/test_assign_raw_spatial_rdd.py
index 6d5d2052..1c288364 100644
--- a/python/tests/test_assign_raw_spatial_rdd.py
+++ b/python/tests/test_assign_raw_spatial_rdd.py
@@ -15,7 +15,7 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
-from sedona.core.SpatialRDD import PointRDD
+from sedona.core.SpatialRDD import PointRDD, CircleRDD
 from tests.properties.point_properties import input_location, offset, splitter, num_partitions
 from tests.test_base import TestBase
 from pyspark import StorageLevel
@@ -43,3 +43,23 @@ class TestSpatialRddAssignment(TestBase):
 
         assert empty_point_rdd.rawSpatialRDD.map(lambda x: x.geom.area).collect()[0] == 0.0
         assert empty_point_rdd.rawSpatialRDD.take(9)[4].getUserData() == "testattribute0\ttestattribute1\ttestattribute2"
+
+    def test_raw_circle_rdd_assignment(self):
+        point_rdd = PointRDD(
+            self.sc,
+            input_location,
+            offset,
+            splitter,
+            True,
+            num_partitions,
+            StorageLevel.MEMORY_ONLY
+        )
+        circle_rdd = CircleRDD(point_rdd, 1.0)
+        circle_rdd.analyze()
+
+        circle_rdd_2 = CircleRDD(point_rdd, 2.0)
+        circle_rdd_2.rawSpatialRDD = circle_rdd.rawSpatialRDD
+        circle_rdd_2.analyze()
+
+        assert circle_rdd_2.countWithoutDuplicates() == circle_rdd.countWithoutDuplicates()
+        assert circle_rdd_2.boundaryEnvelope == circle_rdd.boundaryEnvelope