You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/11/23 06:54:11 UTC

[flink-ml] branch master updated: [FLINK-30144] Enable object reuse in Flink ML tests

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 24eb3ec  [FLINK-30144] Enable object reuse in Flink ML tests
24eb3ec is described below

commit 24eb3ec84b72c0cfb170ea381095b25a8fb8a830
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Wed Nov 23 14:54:05 2022 +0800

    [FLINK-30144] Enable object reuse in Flink ML tests
    
    This closes #179.
---
 .../src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java  | 1 +
 .../test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java  | 1 +
 flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java  | 1 +
 .../src/test/java/org/apache/flink/ml/api/PipelineTest.java         | 1 +
 .../org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java    | 1 +
 .../org/apache/flink/ml/common/datastream/AllReduceImplTest.java    | 4 ++++
 .../org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java  | 1 +
 .../test/java/org/apache/flink/ml/common/window/WindowsTest.java    | 1 +
 .../src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java  | 1 +
 .../java/org/apache/flink/iteration/IterationConstructionTest.java  | 5 +++++
 .../compile/DraftExecutionEnvironmentSwitchWrapperTest.java         | 1 +
 .../progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java     | 6 ++++++
 flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/classification/KnnTest.java   | 1 +
 .../test/java/org/apache/flink/ml/classification/LinearSVCTest.java | 1 +
 .../org/apache/flink/ml/classification/LogisticRegressionTest.java  | 1 +
 .../java/org/apache/flink/ml/classification/NaiveBayesTest.java     | 1 +
 .../flink/ml/classification/OnlineLogisticRegressionTest.java       | 1 +
 .../org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/clustering/KMeansTest.java    | 1 +
 .../test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java  | 1 +
 .../flink/ml/evaluation/BinaryClassificationEvaluatorTest.java      | 1 +
 .../src/test/java/org/apache/flink/ml/feature/BinarizerTest.java    | 2 +-
 .../src/test/java/org/apache/flink/ml/feature/BucketizerTest.java   | 1 +
 flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java | 1 +
 .../java/org/apache/flink/ml/feature/ElementwiseProductTest.java    | 1 +
 .../test/java/org/apache/flink/ml/feature/FeatureHasherTest.java    | 1 +
 .../src/test/java/org/apache/flink/ml/feature/HashingTFTest.java    | 1 +
 flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/ImputerTest.java      | 1 +
 .../src/test/java/org/apache/flink/ml/feature/InteractionTest.java  | 2 +-
 .../test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java | 1 +
 .../src/test/java/org/apache/flink/ml/feature/NGramTest.java        | 1 +
 .../src/test/java/org/apache/flink/ml/feature/NormalizerTest.java   | 2 +-
 .../test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java    | 1 +
 .../java/org/apache/flink/ml/feature/PolynomialExpansionTest.java   | 2 +-
 .../test/java/org/apache/flink/ml/feature/RandomSplitterTest.java   | 1 +
 .../test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java | 1 +
 .../test/java/org/apache/flink/ml/feature/StandardScalerTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/feature/TokenizerTest.java    | 1 +
 .../org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java  | 1 +
 .../test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java  | 1 +
 .../test/java/org/apache/flink/ml/feature/VectorIndexerTest.java    | 1 +
 .../src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java | 1 +
 .../flink/ml/feature/stringindexer/IndexToStringModelTest.java      | 1 +
 .../apache/flink/ml/feature/stringindexer/StringIndexerTest.java    | 1 +
 .../java/org/apache/flink/ml/regression/LinearRegressionTest.java   | 1 +
 .../src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java      | 1 +
 flink-ml-python/pyflink/ml/tests/test_utils.py                      | 1 +
 .../flink/test/iteration/BoundedAllRoundCheckpointITCase.java       | 1 +
 .../flink/test/iteration/BoundedAllRoundStreamIterationITCase.java  | 2 ++
 .../test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java  | 1 +
 .../flink/test/iteration/BoundedPerRoundCheckpointITCase.java       | 1 +
 .../flink/test/iteration/BoundedPerRoundStreamIterationITCase.java  | 1 +
 .../apache/flink/test/iteration/UnboundedStreamIterationITCase.java | 2 ++
 58 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
index fb8e272..e414b6b 100644
--- a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
+++ b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/BenchmarkTest.java
@@ -57,6 +57,7 @@ public class BenchmarkTest extends AbstractTestBase {
     @Test
     public void testRunBenchmark() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         Map<String, Map<String, ?>> params = new HashMap<>();
diff --git a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java
index 25e9930..17cb47d 100644
--- a/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java
+++ b/flink-ml-benchmark/src/test/java/org/apache/flink/ml/benchmark/DataGeneratorTest.java
@@ -51,6 +51,7 @@ public class DataGeneratorTest {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java
index 6cad1b1..2f8c0ff 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/api/GraphTest.java
@@ -52,6 +52,7 @@ public class GraphTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java
index 80c0024..0da5bf9 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java
@@ -47,6 +47,7 @@ public class PipelineTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java
index 08f9498..1cb75a5 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/broadcast/BroadcastUtilsTest.java
@@ -108,6 +108,7 @@ public class BroadcastUtilsTest {
                                         true);
                             }
                         });
+        env.getConfig().enableObjectReuse();
         env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
         env.setParallelism(NUM_SLOT * NUM_TM);
 
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java
index 6b0136b..044ce8d 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/AllReduceImplTest.java
@@ -80,6 +80,7 @@ public class AllReduceImplTest {
         @Test
         public void testAllReduce() throws Exception {
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.getConfig().enableObjectReuse();
             env.setParallelism(parallelism);
             DataStream<double[]> elements =
                     env.fromParallelCollection(
@@ -118,6 +119,7 @@ public class AllReduceImplTest {
             try {
                 StreamExecutionEnvironment env =
                         StreamExecutionEnvironment.getExecutionEnvironment();
+                env.getConfig().enableObjectReuse();
                 env.setParallelism(parallelism);
                 DataStream<double[]> elements =
                         env.fromParallelCollection(
@@ -148,6 +150,7 @@ public class AllReduceImplTest {
             try {
                 StreamExecutionEnvironment env =
                         StreamExecutionEnvironment.getExecutionEnvironment();
+                env.getConfig().enableObjectReuse();
                 env.setParallelism(parallelism);
                 DataStream<double[]> elements =
                         env.fromParallelCollection(
@@ -168,6 +171,7 @@ public class AllReduceImplTest {
         @Test
         public void testAllReduceWithEmptyInput() throws Exception {
             StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+            env.getConfig().enableObjectReuse();
             env.setParallelism(parallelism);
             DataStream<double[]> elements =
                     env.fromParallelCollection(
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java
index fb5e553..f59e6d4 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java
@@ -50,6 +50,7 @@ public class DataStreamUtilsTest {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java
index 5991e94..9389eb5 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/common/window/WindowsTest.java
@@ -55,6 +55,7 @@ public class WindowsTest extends AbstractTestBase {
     @BeforeClass
     public static void beforeClass() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         inputData = new ArrayList<>();
         for (long i = 0; i < RECORD_NUM; i++) {
             inputData.add(i);
diff --git a/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java b/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java
index 10596b3..eda5236 100644
--- a/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java
+++ b/flink-ml-core/src/test/java/org/apache/flink/ml/util/ReadWriteUtilsTest.java
@@ -54,6 +54,7 @@ public class ReadWriteUtilsTest extends AbstractTestBase {
                 new org.apache.flink.configuration.Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
index 5844b5a..4d02f5a 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/IterationConstructionTest.java
@@ -46,6 +46,7 @@ public class IterationConstructionTest extends TestLogger {
     @Test
     public void testEmptyIterationBody() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         DataStream<Integer> variableSource =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
@@ -81,6 +82,7 @@ public class IterationConstructionTest extends TestLogger {
     @Test
     public void testNotChainingHeadOperator() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         DataStream<Integer> variableSource =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
@@ -119,6 +121,7 @@ public class IterationConstructionTest extends TestLogger {
     @Test
     public void testUnboundedIteration() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         DataStream<Integer> variableSource1 =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
                         .setParallelism(2)
@@ -218,6 +221,7 @@ public class IterationConstructionTest extends TestLogger {
     @Test
     public void testBoundedIterationWithTerminationCriteria() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         DataStream<Integer> variableSource1 =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
                         .setParallelism(2)
@@ -322,6 +326,7 @@ public class IterationConstructionTest extends TestLogger {
     @Test
     public void testReplayedIteration() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         DataStream<Integer> variableSource =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {})
                         .setParallelism(2)
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java
index b544003..c958ff5 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/compile/DraftExecutionEnvironmentSwitchWrapperTest.java
@@ -54,6 +54,7 @@ public class DraftExecutionEnvironmentSwitchWrapperTest extends TestLogger {
     @Test
     public void testSwitchingOperatorWrappers() {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         DataStream<Integer> source =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
 
diff --git a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
index 1eb493a..5980655 100644
--- a/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
+++ b/flink-ml-iteration/src/test/java/org/apache/flink/iteration/progresstrack/OperatorEpochWatermarkTrackerFactoryTest.java
@@ -58,6 +58,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
     @Test
     public void testChainedOperator() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
         env.addSource(new EmptySource())
                 .transform(
@@ -72,6 +73,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
     @Test
     public void testOneInputOperator() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.addSource(new EmptySource())
                 .setParallelism(4)
                 .transform(
@@ -87,6 +89,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
     @Test
     public void testUnionedOneInput() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.addSource(new EmptySource())
                 .setParallelism(4)
                 .union(env.addSource(new EmptySource()).setParallelism(3))
@@ -104,6 +107,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
     @Test
     public void testTwoInputOperator() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.addSource(new EmptySource())
                 .setParallelism(4)
                 .connect(env.addSource(new EmptySource()).setParallelism(3))
@@ -120,6 +124,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
     @Test
     public void testUnionedTwoInputOperator() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.addSource(new EmptySource())
                 .setParallelism(4)
                 .union(env.addSource(new EmptySource()).setParallelism(2))
@@ -137,6 +142,7 @@ public class OperatorEpochWatermarkTrackerFactoryTest extends TestLogger {
     @Test
     public void testMultipleInputOperator() throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         DataStream<Integer> first =
                 env.addSource(new EmptySource())
                         .setParallelism(4)
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java
index 3346764..3b11e1f 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/FunctionsTest.java
@@ -77,6 +77,7 @@ public class FunctionsTest extends AbstractTestBase {
     @Before
     public void before() {
         env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         tEnv = StreamTableEnvironment.create(env);
     }
 
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java
index 7597e33..04dadb8 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/KnnTest.java
@@ -98,6 +98,7 @@ public class KnnTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java
index 06a87ec..d7aff6e 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LinearSVCTest.java
@@ -90,6 +90,7 @@ public class LinearSVCTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java
index 6b6d4be..8766036 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/LogisticRegressionTest.java
@@ -107,6 +107,7 @@ public class LogisticRegressionTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
index efb62b9..a4a1545 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/NaiveBayesTest.java
@@ -67,6 +67,7 @@ public class NaiveBayesTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java
index 81d0dc4..956e947 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/classification/OnlineLogisticRegressionTest.java
@@ -185,6 +185,7 @@ public class OnlineLogisticRegressionTest extends TestLogger {
         miniCluster.start();
 
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(defaultParallelism);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java
index 99f5ca3..155648c 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/AgglomerativeClusteringTest.java
@@ -139,6 +139,7 @@ public class AgglomerativeClusteringTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(3);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java
index 2f432e9..2e73a9d 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java
@@ -94,6 +94,7 @@ public class KMeansTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java
index 994a9af..db0dd08 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/OnlineKMeansTest.java
@@ -167,6 +167,7 @@ public class OnlineKMeansTest extends TestLogger {
         miniCluster.start();
 
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(defaultParallelism);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java
index c039586..ab2286f 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/evaluation/BinaryClassificationEvaluatorTest.java
@@ -129,6 +129,7 @@ public class BinaryClassificationEvaluatorTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(3);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java
index 9285555..9b27c3c 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BinarizerTest.java
@@ -83,7 +83,7 @@ public class BinarizerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java
index 6f64a2e..9d27d75 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/BucketizerTest.java
@@ -81,6 +81,7 @@ public class BucketizerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java
index e2146e2..f9cc402 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/DCTTest.java
@@ -75,6 +75,7 @@ public class DCTTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
index cc2041f..270d75b 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ElementwiseProductTest.java
@@ -82,6 +82,7 @@ public class ElementwiseProductTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java
index 941b3b1..7360bad 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/FeatureHasherTest.java
@@ -61,6 +61,7 @@ public class FeatureHasherTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java
index 5b170ab..0448afa 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/HashingTFTest.java
@@ -86,6 +86,7 @@ public class HashingTFTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java
index e4cae5a..84dc341 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/IDFTest.java
@@ -74,6 +74,7 @@ public class IDFTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java
index f788a39..55a12ff 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/ImputerTest.java
@@ -117,6 +117,7 @@ public class ImputerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java
index acf959d..3d905bb 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/InteractionTest.java
@@ -94,7 +94,7 @@ public class InteractionTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java
index 17ad6b5..25b4d31 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/KBinsDiscretizerTest.java
@@ -127,6 +127,7 @@ public class KBinsDiscretizerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java
index 68e443d..ea9e080 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MaxAbsScalerTest.java
@@ -117,6 +117,7 @@ public class MaxAbsScalerTest {
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
 
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java
index 1bfb209..470ec82 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/MinMaxScalerTest.java
@@ -86,6 +86,7 @@ public class MinMaxScalerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java
index 2c9f162..3cb76fb 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NGramTest.java
@@ -58,6 +58,7 @@ public class NGramTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java
index e0045ef..a932c88 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/NormalizerTest.java
@@ -96,7 +96,7 @@ public class NormalizerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java
index a84fdc2..d7b1e5f 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/OneHotEncoderTest.java
@@ -70,6 +70,7 @@ public class OneHotEncoderTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
index a61e7d8..c7cfb90 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
@@ -87,7 +87,7 @@ public class PolynomialExpansionTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
-
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java
index aeb0245..63cb362 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RandomSplitterTest.java
@@ -56,6 +56,7 @@ public class RandomSplitterTest extends AbstractTestBase {
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
 
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
index af5381d..fff1094 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RegexTokenizerTest.java
@@ -57,6 +57,7 @@ public class RegexTokenizerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java
index 2b1e430..aca8dbf 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/RobustScalerTest.java
@@ -94,6 +94,7 @@ public class RobustScalerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java
index 48a2664..6359975 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/StandardScalerTest.java
@@ -91,6 +91,7 @@ public class StandardScalerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
index cacbec5..8ce4140 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/TokenizerTest.java
@@ -61,6 +61,7 @@ public class TokenizerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java
index 217e061..95f9779 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VarianceThresholdSelectorTest.java
@@ -88,6 +88,7 @@ public class VarianceThresholdSelectorTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java
index f22d013..51bd49b 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorAssemblerTest.java
@@ -131,6 +131,7 @@ public class VectorAssemblerTest extends AbstractTestBase {
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
 
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(2);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java
index 17ffda5..560c05a 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorIndexerTest.java
@@ -64,6 +64,7 @@ public class VectorIndexerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java
index 4a75987..8e9c070 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/VectorSlicerTest.java
@@ -75,6 +75,7 @@ public class VectorSlicerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java
index 937b829..bad3eac 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/IndexToStringModelTest.java
@@ -63,6 +63,7 @@ public class IndexToStringModelTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java
index 08b36a1..44c6c35 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/stringindexer/StringIndexerTest.java
@@ -84,6 +84,7 @@ public class StringIndexerTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java
index 5b5af18..5c20109 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/regression/LinearRegressionTest.java
@@ -90,6 +90,7 @@ public class LinearRegressionTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java
index 417455c..4e77409 100644
--- a/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/stats/ChiSqTestTest.java
@@ -109,6 +109,7 @@ public class ChiSqTestTest extends AbstractTestBase {
         Configuration config = new Configuration();
         config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.getConfig().enableObjectReuse();
         env.setParallelism(4);
         env.enableCheckpointing(100);
         env.setRestartStrategy(RestartStrategies.noRestart());
diff --git a/flink-ml-python/pyflink/ml/tests/test_utils.py b/flink-ml-python/pyflink/ml/tests/test_utils.py
index 4b376dc..442de7d 100644
--- a/flink-ml-python/pyflink/ml/tests/test_utils.py
+++ b/flink-ml-python/pyflink/ml/tests/test_utils.py
@@ -39,6 +39,7 @@ def update_existing_params(target: JavaWithParams, source: JavaWithParams):
 class PyFlinkMLTestCase(unittest.TestCase):
     def setUp(self):
         self.env = StreamExecutionEnvironment.get_execution_environment()
+        self.env.get_config().enable_object_reuse()
         self._load_dependency_jars()
         config = Configuration(
             j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment))
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java
index 57be57f..c770b8e 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundCheckpointITCase.java
@@ -141,6 +141,7 @@ public class BoundedAllRoundCheckpointITCase extends TestLogger {
                                         true);
                             }
                         });
+        env.getConfig().enableObjectReuse();
         env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
         env.setParallelism(1);
         DataStream<EpochRecord> variableSource =
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java
index 534c1ea..e840f5b 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedAllRoundStreamIterationITCase.java
@@ -165,6 +165,7 @@ public class BoundedAllRoundStreamIterationITCase extends TestLogger {
             boolean terminationCriteriaFollowsConstantsStreams,
             SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
         DataStream<EpochRecord> variableSource =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {})
@@ -228,6 +229,7 @@ public class BoundedAllRoundStreamIterationITCase extends TestLogger {
             int maxRound,
             SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
         DataStream<EpochRecord> variableSource =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {})
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java
index ec06e64..4399079 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedMixedLifeCycleStreamIterationITCase.java
@@ -111,6 +111,7 @@ public class BoundedMixedLifeCycleStreamIterationITCase extends TestLogger {
             SharedReference<BlockingQueue<OutputRecord<Integer>>> allRoundResult,
             SharedReference<BlockingQueue<OutputRecord<Integer>>> perRoundResult) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
 
         DataStream<EpochRecord> allRoundVariableSource =
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
index 3f5e3da..f0ca7df 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundCheckpointITCase.java
@@ -134,6 +134,7 @@ public class BoundedPerRoundCheckpointITCase extends TestLogger {
                                         true);
                             }
                         });
+        env.getConfig().enableObjectReuse();
         env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
         env.setParallelism(1);
         DataStream<Integer> variableSource =
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java
index 4b7e54e..5f453b7 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/BoundedPerRoundStreamIterationITCase.java
@@ -95,6 +95,7 @@ public class BoundedPerRoundStreamIterationITCase extends TestLogger {
             int maxRound,
             SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
 
         DataStream<Integer> variableSource = env.fromElements(0);
diff --git a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java
index f3f2272..206ba21 100644
--- a/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java
+++ b/flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java
@@ -174,6 +174,7 @@ public class UnboundedStreamIterationITCase extends TestLogger {
             boolean doBroadcast,
             SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
         DataStream<EpochRecord> source =
                 env.addSource(new SequenceSource(numRecordsPerSource, holdSource, period))
@@ -223,6 +224,7 @@ public class UnboundedStreamIterationITCase extends TestLogger {
             int maxRound,
             SharedReference<BlockingQueue<OutputRecord<Integer>>> result) {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
         env.setParallelism(1);
         DataStream<EpochRecord> variableSource =
                 env.addSource(new DraftExecutionEnvironment.EmptySource<EpochRecord>() {})