You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Akshay Hazari (Jira)" <ji...@apache.org> on 2022/04/12 05:36:00 UTC

[jira] [Created] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

Akshay Hazari created FLINK-27193:
-------------------------------------

             Summary: Kyro Serialisation and Deserialisation returns a different object 
                 Key: FLINK-27193
                 URL: https://issues.apache.org/jira/browse/FLINK-27193
             Project: Flink
          Issue Type: Bug
            Reporter: Akshay Hazari


We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails

The KyroSerializer and Deserializer is used like this
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer

class KryoSerializerExtension \{
  fun <T : Any> serde(t: T): T {
    val bytes = serialize(t)
    return deserialize(bytes, t::class)
  }

  fun serialize(any: Any): ByteArray \{
    val config = ExecutionConfig()
    config.registerKryoType(any.javaClass)
    val serializer = KryoSerializer(any.javaClass, config)
    val output = DataOutputSerializer(1)
    serializer.serialize(any, output)
    return output.sharedBuffer
  }

  fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T \{
    val config = ExecutionConfig()
    config.registerKryoType(kClass.java)
    val serializer = KryoSerializer(kClass.java, config)
    val input = DataInputDeserializer(bytes)
    return serializer.deserialize(input)
  }
}
 

It simply looks like this
@Test
fun fieldRecord() \{
  val record = getFieldRecord()
  val result = kryo.serde(record)
  assertThat(result).isEqualTo(record)
}
This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record.
org.opentest4j.AssertionFailedError: 

expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"

but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
 

Whether there is any issue with the way we are serialising deserialising this ?  

Any help is appreciated



--
This message was sent by Atlassian Jira
(v8.20.1#820001)