You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Akshay Hazari (Jira)" <ji...@apache.org> on 2022/04/12 05:40:00 UTC

[jira] [Updated] (FLINK-27193) Kyro Serialisation and Deserialisation returns a different object

     [ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Akshay Hazari updated FLINK-27193:
----------------------------------
    Description: 
We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails

The KyroSerializer and Deserializer is used like this
{code:java}
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer
class KryoSerializerExtension {
    fun <T : Any> serde(t: T): T
    { val bytes = serialize(t) return deserialize(bytes, t::class) }
    fun serialize(any: Any): ByteArray {
    val config = ExecutionConfig()
    config.registerKryoType(any.javaClass)
    val serializer = KryoSerializer(any.javaClass, config)
    val output = DataOutputSerializer(1)
    serializer.serialize(any, output)
    return output.sharedBuffer
}
fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
    val config = ExecutionConfig()
    config.registerKryoType(kClass.java)
    val serializer = KryoSerializer(kClass.java, config)
    val input = DataInputDeserializer(bytes)
    return serializer.deserialize(input)
    }
}
{code}

 

The Unit test simply looks like this
{code:java}
@Test
fun fieldRecord() {
    val record = getFieldRecord()
    val result = kryo.serde(record)
    assertThat(result).isEqualTo(record)
}{code}

This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record.
{code:java}
org.opentest4j.AssertionFailedError: 
expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
  
{code}
 

Whether there is any issue with the way we are serialising deserialising this ?  

Any help is appreciated

  was:
We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails

The KyroSerializer and Deserializer is used like this
import kotlin.reflect.KClass
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.core.memory.DataInputDeserializer
import org.apache.flink.core.memory.DataOutputSerializer

class KryoSerializerExtension \{
  fun <T : Any> serde(t: T): T {
    val bytes = serialize(t)
    return deserialize(bytes, t::class)
  }

  fun serialize(any: Any): ByteArray \{
    val config = ExecutionConfig()
    config.registerKryoType(any.javaClass)
    val serializer = KryoSerializer(any.javaClass, config)
    val output = DataOutputSerializer(1)
    serializer.serialize(any, output)
    return output.sharedBuffer
  }

  fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T \{
    val config = ExecutionConfig()
    config.registerKryoType(kClass.java)
    val serializer = KryoSerializer(kClass.java, config)
    val input = DataInputDeserializer(bytes)
    return serializer.deserialize(input)
  }
}
 

It simply looks like this
@Test
fun fieldRecord() \{
  val record = getFieldRecord()
  val result = kryo.serde(record)
  assertThat(result).isEqualTo(record)
}
This is the actual vs expected assertion error.

The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record.
org.opentest4j.AssertionFailedError: 

expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"

but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
  Flavor         : SPARSE
  LgK            : 10
  Merge Flag     : false
  Error Const    : 0.5887050112577373
  RSE            : 0.01839703160180429
  Seed Hash      : 93cc | 37836
  Num Coupons    : 2
  Num Pairs (SV) : 2
  First Inter Col: 0
  Valid Window   : false
  Valid PairTable: true
  Window Offset  : 0
  KxP            : 1023.375
  HIP Accum      : 2.00012208521548
### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: \{test=1}, numericalRangeHistogramCalculator=
### Quantiles HeapUpdateDoublesSketch SUMMARY: 
   Empty                        : false
   Direct, Capacity bytes       : false, 
   Estimation Mode              : false
   K                            : 128
   N                            : 2
   Levels (Needed, Total, Valid): 0, 0, 0
   Level Bit Pattern            : 0
   BaseBufferCount              : 2
   Combined Buffer Capacity     : 4
   Retained Items               : 2
   Compact Storage Bytes        : 48
   Updatable Storage Bytes      : 64
   Normalized Rank Error        : 1.406%
   Normalized Rank Error (PMF)  : 1.711%
   Min Value                    : 1.000000e+00
   Max Value                    : 3.000000e+00
### END SKETCH SUMMARY
, numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
n: 1
min: 3.0
max: 3.0
sum: 3.0
mean: 3.0
geometric mean: 3.0000000000000004
variance: 0.0
population variance: 0.0
second moment: 0.0
sum of squares: 9.0
standard deviation: 0.0
sum of logs: 1.0986122886681098
]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
 

Whether there is any issue with the way we are serialising deserialising this ?  

Any help is appreciated


> Kyro Serialisation and Deserialisation returns a different object 
> ------------------------------------------------------------------
>
>                 Key: FLINK-27193
>                 URL: https://issues.apache.org/jira/browse/FLINK-27193
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Akshay Hazari
>            Priority: Not a Priority
>
> We have a unit test to check if Kyro serialisation and deserialisation results in the same value but it fails
> The KyroSerializer and Deserializer is used like this
> {code:java}
> import kotlin.reflect.KClass
> import org.apache.flink.api.common.ExecutionConfig
> import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> import org.apache.flink.core.memory.DataInputDeserializer
> import org.apache.flink.core.memory.DataOutputSerializer
> class KryoSerializerExtension {
>     fun <T : Any> serde(t: T): T
>     { val bytes = serialize(t) return deserialize(bytes, t::class) }
>     fun serialize(any: Any): ByteArray {
>     val config = ExecutionConfig()
>     config.registerKryoType(any.javaClass)
>     val serializer = KryoSerializer(any.javaClass, config)
>     val output = DataOutputSerializer(1)
>     serializer.serialize(any, output)
>     return output.sharedBuffer
> }
> fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T {
>     val config = ExecutionConfig()
>     config.registerKryoType(kClass.java)
>     val serializer = KryoSerializer(kClass.java, config)
>     val input = DataInputDeserializer(bytes)
>     return serializer.deserialize(input)
>     }
> }
> {code}
>  
> The Unit test simply looks like this
> {code:java}
> @Test
> fun fieldRecord() {
>     val record = getFieldRecord()
>     val result = kryo.serde(record)
>     assertThat(result).isEqualTo(record)
> }{code}
> This is the actual vs expected assertion error.
> The record is huge all the components hash result in a different value. I am not sure what kyro does which is modifying the record.
> {code:java}
> org.opentest4j.AssertionFailedError: 
> expected: "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.000000e+00
>    Max Value                    : 3.000000e+00
> ### END SKETCH SUMMARY
> , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)"
> but was : "FieldRecord(name=foo, type=string, fieldCounter=FieldCounter(populated=1, missing=3), countDistinctCalculator=### CPD SKETCH - PREAMBLE:
>   Flavor         : SPARSE
>   LgK            : 10
>   Merge Flag     : false
>   Error Const    : 0.5887050112577373
>   RSE            : 0.01839703160180429
>   Seed Hash      : 93cc | 37836
>   Num Coupons    : 2
>   Num Pairs (SV) : 2
>   First Inter Col: 0
>   Valid Window   : false
>   Valid PairTable: true
>   Window Offset  : 0
>   KxP            : 1023.375
>   HIP Accum      : 2.00012208521548
> ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, frequencies: {test=1}, numericalRangeHistogramCalculator=
> ### Quantiles HeapUpdateDoublesSketch SUMMARY: 
>    Empty                        : false
>    Direct, Capacity bytes       : false, 
>    Estimation Mode              : false
>    K                            : 128
>    N                            : 2
>    Levels (Needed, Total, Valid): 0, 0, 0
>    Level Bit Pattern            : 0
>    BaseBufferCount              : 2
>    Combined Buffer Capacity     : 4
>    Retained Items               : 2
>    Compact Storage Bytes        : 48
>    Updatable Storage Bytes      : 64
>    Normalized Rank Error        : 1.406%
>    Normalized Rank Error (PMF)  : 1.711%
>    Min Value                    : 1.000000e+00
>    Max Value                    : 3.000000e+00
> ### END SKETCH SUMMARY
> , numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics:
> n: 1
> min: 3.0
> max: 3.0
> sum: 3.0
> mean: 3.0
> geometric mean: 3.0000000000000004
> variance: 0.0
> population variance: 0.0
> second moment: 0.0
> sum of squares: 9.0
> standard deviation: 0.0
> sum of logs: 1.0986122886681098
> ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)"
>   
> {code}
>  
> Whether there is any issue with the way we are serialising deserialising this ?  
> Any help is appreciated



--
This message was sent by Atlassian Jira
(v8.20.1#820001)