You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/23 06:54:11 UTC
[flink-ml] branch master updated: [FLINK-30144] Enable object reuse in Flink ML tests
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 24eb3ec [FLINK-30144] Enable object reuse in Flink ML tests
24eb3ec is described below
commit 24eb3ec84b72c0cfb170ea381095b25a8fb8a830
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Wed Nov 23 14:54:05 2022 +0800
[FLINK-30144] Enable object reuse in Flink ML tests
This closes #179.
---
.../src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java | 1 +
.../test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java | 1 +
flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java | 1 +
.../src/test/java/org/apache/flink/ml/api/PipelineTest.java | 1 +
.../org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java | 1 +
.../org/apache/flink/ml/common/datastream/AllReduceImplTest.java | 4 ++++
.../org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java | 1 +
.../test/java/org/apache/flink/ml/common/window/WindowsTest.java | 1 +
.../src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java | 1 +
.../java/org/apache/flink/iteration/IterationConstructionTest.java | 5 +++++
.../compile/DraftExecutionEnvironmentSwitchWrapperTest.java | 1 +
.../progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java | 6 ++++++
flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java | 1 +
.../src/test/java/org/apache/flink/ml/classification/KnnTest.java | 1 +
.../test/java/org/apache/flink/ml/classification/LinearSVCTest.java | 1 +
.../org/apache/flink/ml/classification/LogisticRegressionTest.java | 1 +
.../java/org/apache/flink/ml/classification/NaiveBayesTest.java | 1 +
.../flink/ml/classification/OnlineLogisticRegressionTest.java | 1 +
.../org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java | 1 +
.../src/test/java/org/apache/flink/ml/clustering/KMeansTest.java | 1 +
.../test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java | 1 +
.../flink/ml/evaluation/BinaryClassificationEvaluatorTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/BinarizerTest.java | 2 +-
.../src/test/java/org/apache/flink/ml/feature/BucketizerTest.java | 1 +
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java | 1 +
.../java/org/apache/flink/ml/feature/ElementwiseProductTest.java | 1 +
.../test/java/org/apache/flink/ml/feature/FeatureHasherTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/HashingTFTest.java | 1 +
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/ImputerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/InteractionTest.java | 2 +-
.../test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/NGramTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/NormalizerTest.java | 2 +-
.../test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java | 1 +
.../java/org/apache/flink/ml/feature/PolynomialExpansionTest.java | 2 +-
.../test/java/org/apache/flink/ml/feature/RandomSplitterTest.java | 1 +
.../test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java | 1 +
.../test/java/org/apache/flink/ml/feature/StandardScalerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/TokenizerTest.java | 1 +
.../org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java | 1 +
.../test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java | 1 +
.../test/java/org/apache/flink/ml/feature/VectorIndexerTest.java | 1 +
.../src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java | 1 +
.../flink/ml/feature/stringindexer/IndexToStringModelTest.java | 1 +
.../apache/flink/ml/feature/stringindexer/StringIndexerTest.java | 1 +
.../java/org/apache/flink/ml/regression/LinearRegressionTest.java | 1 +
.../src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java | 1 +
flink-ml-python/pyflink/ml/tests/test_utils.py | 1 +
.../flink/test/iteration/BoundedAllRoundCheckpointITCase.java | 1 +
.../flink/test/iteration/BoundedAllRoundStreamIterationITCase.java | 2 ++
.../test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java | 1 +
.../flink/test/iteration/BoundedPerRoundCheckpointITCase.java | 1 +
.../flink/test/iteration/BoundedPerRoundStreamIterationITCase.java | 1 +
.../apache/flink/test/iteration/UnboundedStreamIterationITCase.java | 2 ++
58 files changed, 72 insertions(+), 4 deletions(-)
diff --git a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
index fb8e272..e414b6b 100644
--- a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
+++ b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
@@ -57,6 +57,7 @@ public class BenchmarkTest extends AbstractTestBase {
@Test
public void testRunBenchmark() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
Map<String, Map<String, ?>> params = new HashMap<>();
diff --git a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java
index 25e9930..17cb47d 100644
--- a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java
+++ b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java
@@ -51,6 +51,7 @@ public class DataGeneratorTest {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java
index 6cad1b1..2f8c0ff 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java
@@ -52,6 +52,7 @@ public class GraphTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java
index 80c0024..0da5bf9 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java
@@ -47,6 +47,7 @@ public class PipelineTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java
index 08f9498..1cb75a5 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java
@@ -108,6 +108,7 @@ public class BroadcastUtilsTest {
true);
}
});
+ env.getConfig().enableObjectReuse();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(NUM_SLOT * NUM_TM);
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java
index 6b0136b..044ce8d 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java
@@ -80,6 +80,7 @@ public class AllReduceImplTest {
@Test
public void testAllReduce() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(parallelism);
DataStream<double[]> elements =
env.fromParallelCollection(
@@ -118,6 +119,7 @@ public class AllReduceImplTest {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(parallelism);
DataStream<double[]> elements =
env.fromParallelCollection(
@@ -148,6 +150,7 @@ public class AllReduceImplTest {
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(parallelism);
DataStream<double[]> elements =
env.fromParallelCollection(
@@ -168,6 +171,7 @@ public class AllReduceImplTest {
@Test
public void testAllReduceWithEmptyInput() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(parallelism);
DataStream<double[]> elements =
env.fromParallelCollection(
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java
index fb5e553..f59e6d4 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java
@@ -50,6 +50,7 @@ public class DataStreamUtilsTest {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java
index 5991e94..9389eb5 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java
@@ -55,6 +55,7 @@ public class WindowsTest extends AbstractTestBase {
@BeforeClass
public static void beforeClass() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
inputData = new ArrayList<>();
for (long i = 0; i < RECORD_NUM; i++) {
inputData.add(i);
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java
index 10596b3..eda5236 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java
@@ -54,6 +54,7 @@ public class ReadWriteUtilsTest extends AbstractTestBase {
new org.apache.flink.configuration.Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
index 5844b5a..4d02f5a 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
@@ -46,6 +46,7 @@ public class IterationConstructionTest extends TestLogger {
@Test
public void testEmptyIterationBody() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
DataStream<Integer> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
@@ -81,6 +82,7 @@ public class IterationConstructionTest extends TestLogger {
@Test
public void testNotChainingHeadOperator() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
DataStream<Integer> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
@@ -119,6 +121,7 @@ public class IterationConstructionTest extends TestLogger {
@Test
public void testUnboundedIteration() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
DataStream<Integer> variableSource1 =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(2)
@@ -218,6 +221,7 @@ public class IterationConstructionTest extends TestLogger {
@Test
public void testBoundedIterationWithTerminationCriteria() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
DataStream<Integer> variableSource1 =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(2)
@@ -322,6 +326,7 @@ public class IterationConstructionTest extends TestLogger {
@Test
public void testReplayedIteration() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
DataStream<Integer> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
.setParallelism(2)
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java
index b544003..c958ff5 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java
@@ -54,6 +54,7 @@ public class DraftExecutionEnvironmentSwitchWrapperTest extends TestLogger {
@Test
public void testSwitchingOperatorWrappers() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
DataStream<Integer> source =
env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
index 1eb493a..5980655 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
@@ -58,6 +58,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
@Test
public void testChainedOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
env.addSource(new EmptySource())
.transform(
@@ -72,6 +73,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
@Test
public void testOneInputOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.addSource(new EmptySource())
.setParallelism(4)
.transform(
@@ -87,6 +89,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
@Test
public void testUnionedOneInput() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.addSource(new EmptySource())
.setParallelism(4)
.union(env.addSource(new EmptySource()).setParallelism(3))
@@ -104,6 +107,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
@Test
public void testTwoInputOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.addSource(new EmptySource())
.setParallelism(4)
.connect(env.addSource(new EmptySource()).setParallelism(3))
@@ -120,6 +124,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
@Test
public void testUnionedTwoInputOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.addSource(new EmptySource())
.setParallelism(4)
.union(env.addSource(new EmptySource()).setParallelism(2))
@@ -137,6 +142,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
@Test
public void testMultipleInputOperator() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
DataStream<Integer> first =
env.addSource(new EmptySource())
.setParallelism(4)
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java
index 3346764..3b11e1f 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java
@@ -77,6 +77,7 @@ public class FunctionsTest extends AbstractTestBase {
@Before
public void before() {
env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
tEnv = StreamTableEnvironment.create(env);
}
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java
index 7597e33..04dadb8 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java
@@ -98,6 +98,7 @@ public class KnnTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java
index 06a87ec..d7aff6e 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java
@@ -90,6 +90,7 @@ public class LinearSVCTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java
index 6b6d4be..8766036 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java
@@ -107,6 +107,7 @@ public class LogisticRegressionTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
index efb62b9..a4a1545 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
@@ -67,6 +67,7 @@ public class NaiveBayesTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java
index 81d0dc4..956e947 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java
@@ -185,6 +185,7 @@ public class OnlineLogisticRegressionTest extends TestLogger {
miniCluster.start();
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(defaultParallelism);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java
index 99f5ca3..155648c 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java
@@ -139,6 +139,7 @@ public class AgglomerativeClusteringTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(3);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java
index 2f432e9..2e73a9d 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java
@@ -94,6 +94,7 @@ public class KMeansTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java
index 994a9af..db0dd08 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java
@@ -167,6 +167,7 @@ public class OnlineKMeansTest extends TestLogger {
miniCluster.start();
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(defaultParallelism);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java
index c039586..ab2286f 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java
@@ -129,6 +129,7 @@ public class BinaryClassificationEvaluatorTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(3);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java
index 9285555..9b27c3c 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java
@@ -83,7 +83,7 @@ public class BinarizerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java
index 6f64a2e..9d27d75 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java
@@ -81,6 +81,7 @@ public class BucketizerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java
index e2146e2..f9cc402 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java
@@ -75,6 +75,7 @@ public class DCTTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
index cc2041f..270d75b 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
@@ -82,6 +82,7 @@ public class ElementwiseProductTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java
index 941b3b1..7360bad 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java
@@ -61,6 +61,7 @@ public class FeatureHasherTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java
index 5b170ab..0448afa 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java
@@ -86,6 +86,7 @@ public class HashingTFTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java
index e4cae5a..84dc341 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java
@@ -74,6 +74,7 @@ public class IDFTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java
index f788a39..55a12ff 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java
@@ -117,6 +117,7 @@ public class ImputerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java
index acf959d..3d905bb 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java
@@ -94,7 +94,7 @@ public class InteractionTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java
index 17ad6b5..25b4d31 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java
@@ -127,6 +127,7 @@ public class KBinsDiscretizerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java
index 68e443d..ea9e080 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java
@@ -117,6 +117,7 @@ public class MaxAbsScalerTest {
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java
index 1bfb209..470ec82 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java
@@ -86,6 +86,7 @@ public class MinMaxScalerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java
index 2c9f162..3cb76fb 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java
@@ -58,6 +58,7 @@ public class NGramTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java
index e0045ef..a932c88 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java
@@ -96,7 +96,7 @@ public class NormalizerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java
index a84fdc2..d7b1e5f 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java
@@ -70,6 +70,7 @@ public class OneHotEncoderTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
index a61e7d8..c7cfb90 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
@@ -87,7 +87,7 @@ public class PolynomialExpansionTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java
index aeb0245..63cb362 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java
@@ -56,6 +56,7 @@ public class RandomSplitterTest extends AbstractTestBase {
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
index af5381d..fff1094 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
@@ -57,6 +57,7 @@ public class RegexTokenizerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java
index 2b1e430..aca8dbf 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java
@@ -94,6 +94,7 @@ public class RobustScalerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java
index 48a2664..6359975 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java
@@ -91,6 +91,7 @@ public class StandardScalerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
index cacbec5..8ce4140 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
@@ -61,6 +61,7 @@ public class TokenizerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java
index 217e061..95f9779 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java
@@ -88,6 +88,7 @@ public class VarianceThresholdSelectorTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java
index f22d013..51bd49b 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java
@@ -131,6 +131,7 @@ public class VectorAssemblerTest extends AbstractTestBase {
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(2);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java
index 17ffda5..560c05a 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java
@@ -64,6 +64,7 @@ public class VectorIndexerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java
index 4a75987..8e9c070 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java
@@ -75,6 +75,7 @@ public class VectorSlicerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java
index 937b829..bad3eac 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java
@@ -63,6 +63,7 @@ public class IndexToStringModelTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java
index 08b36a1..44c6c35 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java
@@ -84,6 +84,7 @@ public class StringIndexerTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java
index 5b5af18..5c20109 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java
@@ -90,6 +90,7 @@ public class LinearRegressionTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java
index 417455c..4e77409 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java
@@ -109,6 +109,7 @@ public class ChiSqTestTest extends AbstractTestBase {
Configuration config = new Configuration();
config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+ env.getConfig().enableObjectReuse();
env.setParallelism(4);
env.enableCheckpointing(100);
env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py
index 4b376dc..442de7d 100644
--- a/flink-ml-python/pyflink/ml/tests/test_utils.py
+++ b/flink-ml-python/pyflink/ml/tests/test_utils.py
@@ -39,6 +39,7 @@ def update_existing_params(target: JavaWithParams, source: JavaWithParams):
class PyFlinkMLTestCase(unittest.TestCase):
def setUp(self):
self.env = StreamExecutionEnvironment.get_execution_environment()
+ self.env.get_config().enable_object_reuse()
self._load_dependency_jars()
config = Configuration(
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java
index 57be57f..c770b8e 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java
@@ -141,6 +141,7 @@ public class BoundedAllRoundCheckpointITCase extends TestLogger {
true);
}
});
+ env.getConfig().enableObjectReuse();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
DataStream<EpochRecord> variableSource =
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java
index 534c1ea..e840f5b 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java
@@ -165,6 +165,7 @@ public class BoundedAllRoundStreamIterationITCase extends TestLogger {
boolean terminationCriteriaFollowsConstantsStreams,
SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
DataStream<EpochRecord> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {})
@@ -228,6 +229,7 @@ public class BoundedAllRoundStreamIterationITCase extends TestLogger {
int maxRound,
SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
DataStream<EpochRecord> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {})
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java
index ec06e64..4399079 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java
@@ -111,6 +111,7 @@ public class BoundedMixedLifeCycleStreamIterationITCase extends TestLogger {
SharedReference<BlockingQueue<OutputRecord<Integer>>> allRoundResult,
SharedReference<BlockingQueue<OutputRecord<Integer>>> perRoundResult) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
DataStream<EpochRecord> allRoundVariableSource =
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
index 3f5e3da..f0ca7df 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
@@ -134,6 +134,7 @@ public class BoundedPerRoundCheckpointITCase extends TestLogger {
true);
}
});
+ env.getConfig().enableObjectReuse();
env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
env.setParallelism(1);
DataStream<Integer> variableSource =
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java
index 4b7e54e..5f453b7 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java
@@ -95,6 +95,7 @@ public class BoundedPerRoundStreamIterationITCase extends TestLogger {
int maxRound,
SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
DataStream<Integer> variableSource = env.fromElements(0);
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java
index f3f2272..206ba21 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java
@@ -174,6 +174,7 @@ public class UnboundedStreamIterationITCase extends TestLogger {
boolean doBroadcast,
SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
DataStream<EpochRecord> source =
env.addSource(new SequenceSource(numRecordsPerSource, holdSource, period))
@@ -223,6 +224,7 @@ public class UnboundedStreamIterationITCase extends TestLogger {
int maxRound,
SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
env.setParallelism(1);
DataStream<EpochRecord> variableSource =
env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {})