You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Akshay Hazari (Jira)" <ji...@apache.org> on 2022/04/12 07:42:00 UTC

[jira] [Closed] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

     [ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Akshay Hazari closed FLINK-27193.
---------------------------------
    Resolution: Fixed

> Kyro Serialisation and Deserialisation returns a different object 
> ------------------------------------------------------------------
>
>                 Key: FLINK-27193
>                 URL: https://issues.apache.org/jira/browse/FLINK-27193
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Akshay Hazari
>            Priority: Minor
>
> We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails
> The KyroSerializer and Deserializer is used like this
> {code:java}
> import kotlin.reflect.KClass
> import org.apache.flink.api.common.ExecutionConfig
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> import org.apache.flink.core.memory.DataInputDeserializer
> import org.apache.flink.core.memory.DataOutputSerializer
> class KryoSerializerExtension {
>     fun <T : Any> serde(t: T): T
>     { val bytes = serialize(t) return deserialize(bytes, t::class) }
>     fun serialize(any: Any): ByteArray {
>     val config = ExecutionConfig()
>     config.registerKryoType(any.javaClass)
>     val serializer = KryoSerializer(any.javaClass, config)
>     val output = DataOutputSerializer(1)
>     serializer.serialize(any, output)
>     return output.sharedBuffer
> }
> fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
>     val config = ExecutionConfig()
>     config.registerKryoType(kClass.java)
>     val serializer = KryoSerializer(kClass.java, config)
>     val input = DataInputDeserializer(bytes)
>     return serializer.deserialize(input)
>     }
> }
> {code}
>  
> The Unit test simply looks like this
> {code:java}
> @Test
> fun fieldRecord() {
>     val record = getFieldRecord()
>     val result = kryo.serde(record)
>     assertThat(result).isEqualTo(record)
> }{code}
> This is the actual vs expected assertion error.
> The record is huge all the components hash, result in a different value. I am not sure hot the record is modified.
> {code:java}
> org.opentest4j.AssertionFailedError: 
> expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.000000e+00
>    Max Value                    : 3.000000e+00
> ### END SKETCH SUMMARY
> , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
> but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.000000e+00
>    Max Value                    : 3.000000e+00
> ### END SKETCH SUMMARY
> , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
>   
> {code}
>  
> Whether there is any issue with the way we are serialising deserialising this ?  
> Any help is appreciated



--
This message was sent by Atlassian Jira
(v8.20.1#820001)