You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Yunfeng Zhou (Jira)" <ji...@apache.org> on 2022/02/21 01:44:00 UTC

[jira] [Created] (FLINK-26263) Check data size in LogisticRegression

Yunfeng Zhou created FLINK-26263:
------------------------------------

             Summary: Check data size in LogisticRegression
                 Key: FLINK-26263
                 URL: https://issues.apache.org/jira/browse/FLINK-26263
             Project: Flink
          Issue Type: Bug
          Components: Library / Machine Learning
    Affects Versions: ml-2.0.0
            Reporter: Yunfeng Zhou


In Flink ML LogisticRegression, the algorithm would fail if the parallelism is larger than input data size. For example, in `LogisticRegressionTest.testFitAndPredict()` if we add the following code

```java

env.setParallelism(12);

```

Then the test case would fail with the following exception

```

Caused by: java.lang.IllegalArgumentException: bound must be positive
    at java.base/java.util.Random.nextInt(Random.java:388)
    at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.getMiniBatchData(LogisticRegression.java:351)
    at org.apache.flink.ml.classification.logisticregression.LogisticRegression$CacheDataAndDoTrain.onEpochWatermarkIncremented(LogisticRegression.java:381)
    at org.apache.flink.iteration.operator.AbstractWrapperOperator.notifyEpochWatermarkIncrement(AbstractWrapperOperator.java:129)
    at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.lambda$1(AbstractAllRoundWrapperOperator.java:105)
    at org.apache.flink.iteration.operator.OperatorUtils.processOperatorOrUdfIfSatisfy(OperatorUtils.java:79)
    at org.apache.flink.iteration.operator.allround.AbstractAllRoundWrapperOperator.onEpochWatermarkIncrement(AbstractAllRoundWrapperOperator.java:102)
    at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.tryUpdateLowerBound(OperatorEpochWatermarkTracker.java:79)
    at org.apache.flink.iteration.progresstrack.OperatorEpochWatermarkTracker.onEpochWatermark(OperatorEpochWatermarkTracker.java:63)
    at org.apache.flink.iteration.operator.AbstractWrapperOperator.onEpochWatermarkEvent(AbstractWrapperOperator.java:121)
    at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement(TwoInputAllRoundWrapperOperator.java:77)
    at org.apache.flink.iteration.operator.allround.TwoInputAllRoundWrapperOperator.processElement2(TwoInputAllRoundWrapperOperator.java:59)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord2(StreamTwoInputProcessorFactory.java:225)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$1(StreamTwoInputProcessorFactory.java:194)
    at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
    at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:86)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
    at java.base/java.lang.Thread.run(Thread.java:834)

```

The cause of this exception is that LogisticRegression has not considered the case when input data size is 0. This can be resolved by adding an additional check.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)