You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Zhipeng Zhang (Jira)" <ji...@apache.org> on 2022/06/24 09:09:00 UTC

[jira] [Updated] (FLINK-26263) Add Check for data size in LogisticRegression

     [ https://issues.apache.org/jira/browse/FLINK-26263?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhipeng Zhang updated FLINK-26263:
----------------------------------
    Summary: Add Check for data size in LogisticRegression  (was: Check data size in LogisticRegression)

> Add Check for data size in LogisticRegression
> ---------------------------------------------
>
>                 Key: FLINK-26263
>                 URL: https://issues.apache.org/jira/browse/FLINK-26263
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.0.0
>            Reporter: Yunfeng Zhou
>            Assignee: Yunfeng Zhou
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.1.0
>
>
> In Flink ML LogisticRegression, the algorithm would fail if the parallelism is larger than input data size. For example, in `LogisticRegressionTest.testFitAndPredict()` if we add the following code
> ```java
> env.setParallelism(12);
> ```
> Then the test case would fail with the following exception
> ```
> Caused by: java.lang.IllegalArgumentException: bound must be positive
>     at java.base/java.util.Random.nextInt(Random.java:388)
>     at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.getMiniBatchData(LogisticRegression.java:351)
>     at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.onEpochWatermarkIncremented(LogisticRegression.java:381)
>     at org.apache.flink.iteration.operator.AbstractWrapperOperator.notifyEpochWatermarkIncrement(AbstractWrapperOperator.java:129)
>     at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.lambda$1(AbstractAllRoundWrapperOperator.java:105)
>     at org.apache.flink.iteration.operator.OperatorUtils.processOperatorOrUdfIfSatisfy(OperatorUtils.java:79)
>     at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.onEpochWatermarkIncrement(AbstractAllRoundWrapperOperator.java:102)
>     at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.tryUpdateLowerBound(OperatorEpochWatermarkTracker.java:79)
>     at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.onEpochWatermark(OperatorEpochWatermarkTracker.java:63)
>     at org.apache.flink.iteration.operator.AbstractWrapperOperator.onEpochWatermarkEvent(AbstractWrapperOperator.java:121)
>     at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement(TwoInputAllRoundWrapperOperator.java:77)
>     at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement2(TwoInputAllRoundWrapperOperator.java:59)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:225)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:194)
>     at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>     at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>     at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
>     at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>     at java.base/java.lang.Thread.run(Thread.java:834)
> ```
> The cause of this exception is that LogisticRegression has not considered the case when input data size is 0. This can be resolved by adding an additional check.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)