You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Akshay Hazari (Jira)" <ji...@apache.org> on 2022/04/12 05:36:00 UTC
[jira] [Created] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object
Akshay Hazari created FLINK-27193:
-------------------------------------
Summary: Kyro Serialisation and Deserialisation returns a different object
Key: FLINK-27193
URL: https://issues.apache.org/jira/browse/FLINK-27193
Project: Flink
Issue Type: Bug
Reporter: Akshay Hazari
We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails
The KyroSerializer and Deserializer is used like this
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer
class KryoSerializerExtension \{
fun <T : Any> serde(t: T): T {
val bytes = serialize(t)
return deserialize(bytes, t::class)
}
fun serialize(any: Any): ByteArray \{
val config = ExecutionConfig()
config.registerKryoType(any.javaClass)
val serializer = KryoSerializer(any.javaClass, config)
val output = DataOutputSerializer(1)
serializer.serialize(any, output)
return output.sharedBuffer
}
fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T \{
val config = ExecutionConfig()
config.registerKryoType(kClass.java)
val serializer = KryoSerializer(kClass.java, config)
val input = DataInputDeserializer(bytes)
return serializer.deserialize(input)
}
}
It simply looks like this
@Test
fun fieldRecord() \{
val record = getFieldRecord()
val result = kryo.serde(record)
assertThat(result).isEqualTo(record)
}
This is the actual vs expected assertion error.
The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record.
org.opentest4j.AssertionFailedError:
expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
Flavor : SPARSE
LgK : 10
Merge Flag : false
Error Const : 0.5887050112577373
RSE : 0.01839703160180429
Seed Hash : 93cc | 37836
Num Coupons : 2
Num Pairs (SV) : 2
First Inter Col: 0
Valid Window : false
Valid PairTable: true
Window Offset : 0
KxP : 1023.375
HIP Accum : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY:
Empty : false
Direct, Capacity bytes : false,
Estimation Mode : false
K : 128
N : 2
Levels (Needed, Total, Valid): 0, 0, 0
Level Bit Pattern : 0
BaseBufferCount : 2
Combined Buffer Capacity : 4
Retained Items : 2
Compact Storage Bytes : 48
Updatable Storage Bytes : 64
Normalized Rank Error : 1.406%
Normalized Rank Error (PMF) : 1.711%
Min Value : 1.000000e+00
Max Value : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
Flavor : SPARSE
LgK : 10
Merge Flag : false
Error Const : 0.5887050112577373
RSE : 0.01839703160180429
Seed Hash : 93cc | 37836
Num Coupons : 2
Num Pairs (SV) : 2
First Inter Col: 0
Valid Window : false
Valid PairTable: true
Window Offset : 0
KxP : 1023.375
HIP Accum : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY:
Empty : false
Direct, Capacity bytes : false,
Estimation Mode : false
K : 128
N : 2
Levels (Needed, Total, Valid): 0, 0, 0
Level Bit Pattern : 0
BaseBufferCount : 2
Combined Buffer Capacity : 4
Retained Items : 2
Compact Storage Bytes : 48
Updatable Storage Bytes : 64
Normalized Rank Error : 1.406%
Normalized Rank Error (PMF) : 1.711%
Min Value : 1.000000e+00
Max Value : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
Whether there is any issue with the way we are serialising deserialising this ?
Any help is appreciated
--
This message was sent by Atlassian Jira
(v8.20.1#820001)