You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Thomas Weise (Jira)" <ji...@apache.org> on 2022/04/26 04:25:00 UTC
[jira] [Resolved] (FLINK-27381) HybridSource split should use Arrays.hashcode
[ https://issues.apache.org/jira/browse/FLINK-27381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Thomas Weise resolved FLINK-27381.
----------------------------------
Resolution: Fixed
> HybridSource split should use Arrays.hashcode
> ---------------------------------------------
>
> Key: FLINK-27381
> URL: https://issues.apache.org/jira/browse/FLINK-27381
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Common
> Affects Versions: 1.14.4
> Reporter: Ran Tao
> Assignee: Ran Tao
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.16.0
>
>
>
> HybridSourceSplit's wrappedSplit has been changed from SourceSplit to serialized wrappedSplitsBytes array. but hashcode methods did not match the byte array. however here we should use Arrays.hashcode for serialized wrappedSplitsBytes.
>
> {code:java}
> public int hashCode(){
> return Objects.hash(wrappedSplitBytes, sourceIndex);
> } {code}
>
> {code:java}
> // old
> public class HybridSourceSplit implements SourceSplit {
> private final SourceSplit wrappedSplit;
> private final int sourceIndex;
> public HybridSourceSplit(int sourceIndex, SourceSplit wrappedSplit) {
> this.sourceIndex = sourceIndex;
> this.wrappedSplit = wrappedSplit;
> }
> public int sourceIndex() {
> return this.sourceIndex;
> }
> public SourceSplit getWrappedSplit() {
> return wrappedSplit;
> }
> @Override
> public int hashCode() {
> return Objects.hash(wrappedSplit, sourceIndex);
> } ...
> } {code}
>
> {code:java}
> // current(master branch)
> public class HybridSourceSplit implements SourceSplit {
> private final byte[] wrappedSplitBytes;
> private final int wrappedSplitSerializerVersion;
> private final int sourceIndex;
> private final String splitId;
> public HybridSourceSplit(
> int sourceIndex, byte[] wrappedSplit, int serializerVersion, String splitId) {
> this.sourceIndex = sourceIndex;
> this.wrappedSplitBytes = wrappedSplit;
> this.wrappedSplitSerializerVersion = serializerVersion;
> this.splitId = splitId;
> }
> public int sourceIndex() {
> return this.sourceIndex;
> }
> public byte[] wrappedSplitBytes() {
> return wrappedSplitBytes;
> }
> @Override
> public int hashCode() {
> return Objects.hash(wrappedSplitBytes, sourceIndex);
> }
> ...
> } {code}
> detail diff:
> [https://github.com/apache/flink/pull/17143/files#diff-cbf6e2386d7457d9084faa23053f1f96ba5f173f5531d0e4a94205497e08df4c]
>
> detail issue:
> https://issues.apache.org/jira/browse/FLINK-24064
>
--
This message was sent by Atlassian Jira
(v8.20.7#820007)