You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/03/09 09:34:00 UTC

[jira] [Updated] (FLINK-25481) SourceIndex comparison in SplitEnumeratorContextProxy

     [ https://issues.apache.org/jira/browse/FLINK-25481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Gao updated FLINK-25481:
----------------------------
    Component/s: API / Core

> SourceIndex comparison in SplitEnumeratorContextProxy
> -----------------------------------------------------
>
>                 Key: FLINK-25481
>                 URL: https://issues.apache.org/jira/browse/FLINK-25481
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.15.0, 1.13.5, 1.14.2
>            Reporter: Yuhao Bi
>            Priority: Major
>
> In [HybridSourceSplitEnumerator.java|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceSplitEnumerator.java]
> the sourceIndex is used by value, but in the following block, it's compared by reference address after boxing
> {code:java}
> @Override
> public Map<Integer, ReaderInfo> registeredReaders() {
>     // TODO: not start enumerator until readers are ready?
>     Map<Integer, ReaderInfo> readers = realContext.registeredReaders();
>     if (readers.size() != readerSourceIndex.size()) {
>         return filterRegisteredReaders(readers);
>     }
>     Integer lastIndex = null;
>     for (Integer sourceIndex : readerSourceIndex.values()) {
> // Integer reference variable compared by '==' operator
>         if (lastIndex != null && lastIndex != sourceIndex) {
>             return filterRegisteredReaders(readers);
>         }
>         lastIndex = sourceIndex;
>     }
>     return readers;
> }
> private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) {
>     Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size());
>     for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
> // sourceIndex cast to Integer then compared by '==' operator
>         if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) {
>             readersForSource.put(e.getKey(), e.getValue());
>         }
>     }
>     return readersForSource;
> }
>  {code}
> Java will cache Integer in the range between -128 to +127 so the code works, but if my understanding is correct it might be better replaced by .equals method call.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)