You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Akshay Hazari (Jira)" <ji...@apache.org> on 2022/04/12 07:42:00 UTC
[jira] [Closed] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object
[ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Akshay Hazari closed FLINK-27193.
---------------------------------
Resolution: Fixed
> Kyro Serialisation and Deserialisation returns a different object
> ------------------------------------------------------------------
>
> Key: FLINK-27193
> URL: https://issues.apache.org/jira/browse/FLINK-27193
> Project: Flink
> Issue Type: Bug
> Reporter: Akshay Hazari
> Priority: Minor
>
> We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails
> The KyroSerializer and Deserializer is used like this
> {code:java}
> import kotlin.reflect.KClass
> import org.apache.flink.api.common.ExecutionConfig
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> import org.apache.flink.core.memory.DataInputDeserializer
> import org.apache.flink.core.memory.DataOutputSerializer
> class KryoSerializerExtension {
> fun <T : Any> serde(t: T): T
> { val bytes = serialize(t) return deserialize(bytes, t::class) }
> fun serialize(any: Any): ByteArray {
> val config = ExecutionConfig()
> config.registerKryoType(any.javaClass)
> val serializer = KryoSerializer(any.javaClass, config)
> val output = DataOutputSerializer(1)
> serializer.serialize(any, output)
> return output.sharedBuffer
> }
> fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
> val config = ExecutionConfig()
> config.registerKryoType(kClass.java)
> val serializer = KryoSerializer(kClass.java, config)
> val input = DataInputDeserializer(bytes)
> return serializer.deserialize(input)
> }
> }
> {code}
>
> The Unit test simply looks like this
> {code:java}
> @Test
> fun fieldRecord() {
> val record = getFieldRecord()
> val result = kryo.serde(record)
> assertThat(result).isEqualTo(record)
> }{code}
> This is the actual vs expected assertion error.
> The record is huge all the components hash, result in a different value. I am not sure hot the record is modified.
> {code:java}
> org.opentest4j.AssertionFailedError:
> expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
> Flavor : SPARSE
> LgK : 10
> Merge Flag : false
> Error Const : 0.5887050112577373
> RSE : 0.01839703160180429
> Seed Hash : 93cc | 37836
> Num Coupons : 2
> Num Pairs (SV) : 2
> First Inter Col: 0
> Valid Window : false
> Valid PairTable: true
> Window Offset : 0
> KxP : 1023.375
> HIP Accum : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY:
> Empty : false
> Direct, Capacity bytes : false,
> Estimation Mode : false
> K : 128
> N : 2
> Levels (Needed, Total, Valid): 0, 0, 0
> Level Bit Pattern : 0
> BaseBufferCount : 2
> Combined Buffer Capacity : 4
> Retained Items : 2
> Compact Storage Bytes : 48
> Updatable Storage Bytes : 64
> Normalized Rank Error : 1.406%
> Normalized Rank Error (PMF) : 1.711%
> Min Value : 1.000000e+00
> Max Value : 3.000000e+00
> ### END SKETCH SUMMARY
> , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
> but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
> Flavor : SPARSE
> LgK : 10
> Merge Flag : false
> Error Const : 0.5887050112577373
> RSE : 0.01839703160180429
> Seed Hash : 93cc | 37836
> Num Coupons : 2
> Num Pairs (SV) : 2
> First Inter Col: 0
> Valid Window : false
> Valid PairTable: true
> Window Offset : 0
> KxP : 1023.375
> HIP Accum : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY:
> Empty : false
> Direct, Capacity bytes : false,
> Estimation Mode : false
> K : 128
> N : 2
> Levels (Needed, Total, Valid): 0, 0, 0
> Level Bit Pattern : 0
> BaseBufferCount : 2
> Combined Buffer Capacity : 4
> Retained Items : 2
> Compact Storage Bytes : 48
> Updatable Storage Bytes : 64
> Normalized Rank Error : 1.406%
> Normalized Rank Error (PMF) : 1.711%
> Min Value : 1.000000e+00
> Max Value : 3.000000e+00
> ### END SKETCH SUMMARY
> , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
>
> {code}
>
> Whether there is any issue with the way we are serialising deserialising this ?
> Any help is appreciated
--
This message was sent by Atlassian Jira
(v8.20.1#820001)