You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/02/20 03:10:43 UTC

[GitHub] [spark] shardulm94 opened a new pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

shardulm94 opened a new pull request #31597:
URL: https://github.com/apache/spark/pull/31597


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   1) Modify `GenericAvroSerializer` to support serialization of any `GenericContainer`
   2) Register `KryoSerializer`s for `GenericData.{Array, EnumSymbol, Fixed}` using the modified `GenericAvroSerializer`
   
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Without this change, Kryo throws NPEs when trying to serialize `GenericData.{Array, EnumSymbol, Fixed}`. More details in SPARK-34477 Jira
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Added unit tests for testing roundtrip serialization and deserialization of `GenericData.{Array, EnumSymbol, Fixed}` using `GenericAvroSerializer` directly and also indirectly through `KryoSerializer`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
xkrogen commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-786186282


   @mridulm would you be able to take a look? I think you have some experience in the Avro/Kryo/serializer area.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
srowen commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-803449730


   OK but why not just transform to a normal / simple array type? I'm not against this, I'd merge it, just seems avoidable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r584868015



##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =

Review comment:
       +1 good call!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805371213


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41009/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-782576508


   The test which failed above `org.apache.spark.ml.source.image.ImageFileFormatSuite` seems unrelated to this change. I also ran the test locally and it seems to be working fine.
   `build/mvn test -Dtest=none -Dsuites="org.apache.spark.ml.source.image.ImageFileFormatSuite" -pl mllib`
   
   ```
   Run starting. Expected test count is: 7
   ImageFileFormatSuite:
   - Smoke test: create basic ImageSchema dataframe
   - image datasource count test
   - image datasource test: read jpg image
   - image datasource test: read png image
   - image datasource test: read non image
   - image datasource partition test
   - readImages pixel values test
   Run completed in 6 seconds, 939 milliseconds.
   Total number of tests run: 7
   Suites: completed 1, aborted 0
   Tests: succeeded 7, failed 0, canceled 0, ignored 0, pending 0
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r581275741



##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.

Review comment:
       Done

##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805371236


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41009/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
srowen commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-794698435


   When does this generally come up? it's not needed to read an avro data source, and I'd suppose data wouldn't be carried back and forth in the in the internal representation classes much. I don't doubt there's some use case, just getting a sense of whether it's not niche


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r581249774



##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: $name serialization and deserialization through KryoSerializer ") {
+      require(conf.get(SERIALIZER) == "org.apache.spark.serializer.KryoSerializer")

Review comment:
       I decided to keep this for convenience. Since this test case tests for Kryo specifically, and won't work without it, the test failure would be more obvious if someone tries to remove the conf at the top of the file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805367171


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41009/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805344647


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/136425/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-794679335


   +CC @srowen @HeartSaVioR to get more eyes on serializer changes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r586003264



##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =
+      kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
+    registerAvro[GenericRecord]
+    registerAvro[GenericData.Record]
+    registerAvro[GenericData.Array[_]]
+    registerAvro[GenericData.EnumSymbol]
+    registerAvro[GenericData.Fixed]

Review comment:
       I think for specific records we will need to use a different codepath inside Avro for serde viz. `SpecificDatum(Reader|Writer)` instead of `GenericDatum(Reader|Writer)` which is currently how `GenericAvroSerializer` is implemented. Also I think we will need to register each concrete SpecificRecord class for Kryo to work correctly. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-786171171


   @xkrogen @dongjoon-hyun Who else would be good set of reviewers to push this forward? Unfortunately, this code hasn't been modified by anyone else other than the original author. Most other commits touching this file are from larger scoped refactorings across the repo so I didn't think the change would be relevant to them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r581197032



##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {

Review comment:
       Minor nit suggestion: take `GenericData` prefix out of the sequence above and instead update this line to be `GenericData.$name serialization ... ` ? 

##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.

Review comment:
       Can we add a `@tparam` here?

##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: $name serialization and deserialization through KryoSerializer ") {
+      require(conf.get(SERIALIZER) == "org.apache.spark.serializer.KryoSerializer")

Review comment:
       Do we need the `require` ? At the top of the class we're explicitly setting this value

##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,15 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    kryo.register(classOf[GenericRecord], new GenericAvroSerializer[GenericRecord](avroSchemas))

Review comment:
       Can we get a little more DRY on these lines like:
   ```
       def registerAvro[T]: Unit = kryo.register(classOf[T], new GenericAvroSerializer[T](avroSchemas))
       registerAvro[GenericRecord]
       registerAvro[GenericData.Record]
       // ...
   ```

##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.
  */
-private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
-  extends KSerializer[GenericRecord] {
+private[serializer] class GenericAvroSerializer[D >: Null <: GenericContainer]

Review comment:
       Why do we need the lower bound on `Null` here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang closed pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
gengliangwang closed pull request #31597:
URL: https://github.com/apache/spark/pull/31597


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r581276488



##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,15 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    kryo.register(classOf[GenericRecord], new GenericAvroSerializer[GenericRecord](avroSchemas))

Review comment:
       Done. Thanks! This looks so much better now!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805572128


   Github action tests passed. Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-782547530






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
srowen commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805324861


   Jenkins test this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r586618449



##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =
+      kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
+    registerAvro[GenericRecord]
+    registerAvro[GenericData.Record]
+    registerAvro[GenericData.Array[_]]
+    registerAvro[GenericData.EnumSymbol]
+    registerAvro[GenericData.Fixed]

Review comment:
       To clarify, can we add that a comment ? :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r587572967



##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =
+      kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
+    registerAvro[GenericRecord]
+    registerAvro[GenericData.Record]
+    registerAvro[GenericData.Array[_]]
+    registerAvro[GenericData.EnumSymbol]
+    registerAvro[GenericData.Fixed]

Review comment:
       Ahh! Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r584255958



##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,89 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("Record", recordDatum),
+    ("Array", arrayDatum),
+    ("EnumSymbol", enumDatum),
+    ("Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: GenericData.$name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: GenericData.$name serialization and deserialization" +
+      s"through KryoSerializer ") {

Review comment:
       super nit: loose the `s`

##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,89 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")

Review comment:
       ```suggestion
     override def beforeAll(): Unit = {
       conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
       super.beforeAll()
     }
   ```

##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =
+      kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
+    registerAvro[GenericRecord]
+    registerAvro[GenericData.Record]
+    registerAvro[GenericData.Array[_]]
+    registerAvro[GenericData.EnumSymbol]
+    registerAvro[GenericData.Fixed]

Review comment:
       Comment on why we are not handling SpecificRecordBase, SpecificFixed ?

##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.
  */
-private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
-  extends KSerializer[GenericRecord] {
+private[serializer] class GenericAvroSerializer[D >: Null <: GenericContainer]

Review comment:
       Use `null.asInstanceOf[D]` and remove bound on Null

##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =

Review comment:
       The method has side effect, change to:
   
   ```suggestion
       def registerAvro[T <: GenericContainer]()(implicit ct: ClassTag[T]): Unit =
         kryo.register(ct.runtimeClass, new GenericAvroSerializer[T](avroSchemas))
   ```
   

##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: $name serialization and deserialization through KryoSerializer ") {
+      require(conf.get(SERIALIZER) == "org.apache.spark.serializer.KryoSerializer")

Review comment:
       This suite assumes kryo serializer ... dont see much value in keeping it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-804403342


   I think this is more relevant in a LinkedIn context, not sure if anyone else follows something similar. At LinkedIn we use Avro data-model heavily across our batch and streaming (Kafka, Samza) stacks. It is useful for our users to reuse libraries written using Avro data model across both their pipelines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-784175131


   cc: @xkrogen, @JDrit (just in case you are around)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-782547530


   Can one of the admins verify this patch?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
dongjoon-hyun commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-782567917


   cc @gengliangwang 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xkrogen commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
xkrogen commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r581258552



##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: $name serialization and deserialization through KryoSerializer ") {
+      require(conf.get(SERIALIZER) == "org.apache.spark.serializer.KryoSerializer")

Review comment:
       Makes sense, thanks.

##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.
  */
-private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
-  extends KSerializer[GenericRecord] {
+private[serializer] class GenericAvroSerializer[D >: Null <: GenericContainer]

Review comment:
       Fascinating, thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r586003489



##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.
  */
-private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
-  extends KSerializer[GenericRecord] {
+private[serializer] class GenericAvroSerializer[D >: Null <: GenericContainer]

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-799234187


   @srowen This only comes up in the RDD APIs when you have an `RDD[GenericXXX]` and then for some reason you need to serialize it using Kyro, e.g. to broadcast or collect on driver. The most common case ofcourse is having an `RDD[GenericRecord]` assuming you were reading an Avro file to create the RDD, and this case was already being handled previously. However we have customers who are only projecting a single field from an `RDD[GenericRecord]` using `.map` thus resulting in an `RDD[GenericData.Array]` for array fields when then causes Kryo failures.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r586003615



##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,88 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("GenericData.Record", recordDatum),
+    ("GenericData.Array", arrayDatum),
+    ("GenericData.EnumSymbol", enumDatum),
+    ("GenericData.Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: $name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: $name serialization and deserialization through KryoSerializer ") {
+      require(conf.get(SERIALIZER) == "org.apache.spark.serializer.KryoSerializer")

Review comment:
       Removed




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r586003835



##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,89 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
 
-  val schema : Schema = SchemaBuilder
+  val recordSchema : Schema = SchemaBuilder
     .record("testRecord").fields()
     .requiredString("data")
     .endRecord()
-  val record = new Record(schema)
-  record.put("data", "test data")
+  val recordDatum = new Record(recordSchema)
+  recordDatum.put("data", "test data")
 
-  test("schema compression and decompression") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema))))
-  }
+  val arraySchema = SchemaBuilder.array().items().`type`(recordSchema)
+  val arrayDatum = new AvroArray[Record](1, arraySchema)
+  arrayDatum.add(recordDatum)
 
-  test("record serialization and deserialization") {
-    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+  val enumSchema = SchemaBuilder.enumeration("enum").symbols("A", "B")
+  val enumDatum = new EnumSymbol(enumSchema, "A")
 
-    val outputStream = new ByteArrayOutputStream()
-    val output = new Output(outputStream)
-    genericSer.serializeDatum(record, output)
-    output.flush()
-    output.close()
+  val fixedSchema = SchemaBuilder.fixed("fixed").size(4)
+  val fixedDatum = new Fixed(fixedSchema, "ABCD".getBytes)
 
-    val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
-    assert(genericSer.deserializeDatum(input) === record)
+  test("schema compression and decompression") {
+    val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
+    assert(recordSchema ===
+      genericSer.decompress(ByteBuffer.wrap(genericSer.compress(recordSchema))))
   }
 
   test("uses schema fingerprint to decrease message size") {
-    val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema)
+    val genericSerFull = new GenericAvroSerializer[Record](conf.getAvroSchema)
 
     val output = new Output(new ByteArrayOutputStream())
 
     val beginningNormalPosition = output.total()
-    genericSerFull.serializeDatum(record, output)
+    genericSerFull.serializeDatum(recordDatum, output)
     output.flush()
     val normalLength = output.total - beginningNormalPosition
 
-    conf.registerAvroSchemas(schema)
-    val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
+    conf.registerAvroSchemas(recordSchema)
+    val genericSerFinger = new GenericAvroSerializer[Record](conf.getAvroSchema)
     val beginningFingerprintPosition = output.total()
-    genericSerFinger.serializeDatum(record, output)
+    genericSerFinger.serializeDatum(recordDatum, output)
     val fingerprintLength = output.total - beginningFingerprintPosition
 
     assert(fingerprintLength < normalLength)
   }
 
   test("caches previously seen schemas") {
     val genericSer = new GenericAvroSerializer(conf.getAvroSchema)
-    val compressedSchema = genericSer.compress(schema)
+    val compressedSchema = genericSer.compress(recordSchema)
     val decompressedSchema = genericSer.decompress(ByteBuffer.wrap(compressedSchema))
 
-    assert(compressedSchema.eq(genericSer.compress(schema)))
+    assert(compressedSchema.eq(genericSer.compress(recordSchema)))
     assert(decompressedSchema.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema))))
   }
+
+  Seq(
+    ("Record", recordDatum),
+    ("Array", arrayDatum),
+    ("EnumSymbol", enumDatum),
+    ("Fixed", fixedDatum)
+  ).foreach { case (name, datum) =>
+    test(s"SPARK-34477: GenericData.$name serialization and deserialization") {
+      val genericSer = new GenericAvroSerializer[datum.type](conf.getAvroSchema)
+
+      val outputStream = new ByteArrayOutputStream()
+      val output = new Output(outputStream)
+      genericSer.serializeDatum(datum, output)
+      output.flush()
+      output.close()
+
+      val input = new Input(new ByteArrayInputStream(outputStream.toByteArray))
+      assert(genericSer.deserializeDatum(input) === datum)
+    }
+
+    test(s"SPARK-34477: GenericData.$name serialization and deserialization" +
+      s"through KryoSerializer ") {

Review comment:
       Done

##########
File path: core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala
##########
@@ -22,64 +22,89 @@ import java.nio.ByteBuffer
 
 import com.esotericsoftware.kryo.io.{Input, Output}
 import org.apache.avro.{Schema, SchemaBuilder}
-import org.apache.avro.generic.GenericData.Record
+import org.apache.avro.generic.GenericData.{Array => AvroArray, EnumSymbol, Fixed, Record}
 
 import org.apache.spark.{SharedSparkContext, SparkFunSuite}
 import org.apache.spark.internal.config.SERIALIZER
 
 class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
   conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #31597:
URL: https://github.com/apache/spark/pull/31597#issuecomment-805371236


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/41009/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r586003985



##########
File path: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
##########
@@ -153,8 +153,13 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(classOf[SerializableJobConf], new KryoJavaSerializer())
     kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())
 
-    kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas))
-    kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas))
+    def registerAvro[T >: Null <: GenericContainer](implicit ct: ClassTag[T]) =

Review comment:
       Done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] shardulm94 commented on a change in pull request #31597: [SPARK-34477][CORE] Register KryoSerializers for Avro GenericData classes

Posted by GitBox <gi...@apache.org>.
shardulm94 commented on a change in pull request #31597:
URL: https://github.com/apache/spark/pull/31597#discussion_r581253175



##########
File path: core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala
##########
@@ -44,8 +44,8 @@ import org.apache.spark.util.Utils
  *                string representation of the Avro schema, used to decrease the amount of data
  *                that needs to be serialized.
  */
-private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
-  extends KSerializer[GenericRecord] {
+private[serializer] class GenericAvroSerializer[D >: Null <: GenericContainer]

Review comment:
       We need to pass in a null at https://github.com/apache/spark/blob/735246119836f20811c67193f778246e1157e63e/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala#L155 where the compiler expects `D`. Without the lower bound, the Scala compiler won't let me do that.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org