You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/05/06 11:06:00 UTC

[jira] [Assigned] (FLINK-27529) HybridSourceSplitEnumerator sourceIndex using error Integer check

     [ https://issues.apache.org/jira/browse/FLINK-27529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser reassigned FLINK-27529:
--------------------------------------

    Assignee: Ran Tao

> HybridSourceSplitEnumerator sourceIndex using error Integer check
> -----------------------------------------------------------------
>
>                 Key: FLINK-27529
>                 URL: https://issues.apache.org/jira/browse/FLINK-27529
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Common
>    Affects Versions: 1.15.0, 1.14.4, 1.15.1
>            Reporter: Ran Tao
>            Assignee: Ran Tao
>            Priority: Major
>              Labels: pull-request-available
>
> Currently HybridSourceSplitEnumerator check readerSourceIndex using Integer type but == operator.  In some case, it will cause error(Integer == only works fine in [-128,127]) we can use Integer.equals instead. But actually readerSourceIndex is primitive int intrinsically,so we can change Integer to int to check sourceIndex instead of Integer.equals method.  it will be more elegant.
> {code:java}
> @Override
>         public Map<Integer, ReaderInfo> registeredReaders() {
>             // TODO: not start enumerator until readers are ready?
>             Map<Integer, ReaderInfo> readers = realContext.registeredReaders();
>             if (readers.size() != readerSourceIndex.size()) {
>                 return filterRegisteredReaders(readers);
>             }
>             Integer lastIndex = null;
>             for (Integer sourceIndex : readerSourceIndex.values()) {
>                 if (lastIndex != null && lastIndex != sourceIndex) {
>                     return filterRegisteredReaders(readers);
>                 }
>                 lastIndex = sourceIndex;
>             }
>             return readers;
>         }
>         private Map<Integer, ReaderInfo> filterRegisteredReaders(Map<Integer, ReaderInfo> readers) {
>             Map<Integer, ReaderInfo> readersForSource = new HashMap<>(readers.size());
>             for (Map.Entry<Integer, ReaderInfo> e : readers.entrySet()) {
>                 if (readerSourceIndex.get(e.getKey()) == (Integer) sourceIndex) {
>                     readersForSource.put(e.getKey(), e.getValue());
>                 }
>             }
>             return readersForSource;
>         }
> {code}
> {code:java}
>     private void sendSwitchSourceEvent(int subtaskId, int sourceIndex) {
>         readerSourceIndex.put(subtaskId, sourceIndex);
>         ......
>    }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)