You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/18 04:15:02 UTC

[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #177: [FLINK-30014][hotfix] ensure saving non-null value in aggregate util function state

zhipeng93 commented on code in PR #177:
URL: https://github.com/apache/flink-ml/pull/177#discussion_r1025970532


##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java:
##########
@@ -86,6 +87,29 @@ public void testAggregate() throws Exception {
         assertEquals("190", stringSum.get(0));
     }
 
+    @Test
+    public void testAggregateWithNonNeutralInitialAccumulator() throws Exception {
+        DataStream<Long> dataStream =
+                env.fromParallelCollection(new NumberSequenceIterator(0L, 19L), Types.LONG);
+        DataStream<String> result =
+                DataStreamUtils.aggregate(
+                        dataStream, new TestAggregateFuncWithNonNeutralInitialAccumulator());
+        List<String> stringSumList = IteratorUtils.toList(result.executeAndCollect());
+        assertEquals(1, stringSumList.size());
+        String stringSum1 = stringSumList.get(0);
+
+        env.setParallelism(env.getParallelism() + 1);
+        dataStream = env.fromParallelCollection(new NumberSequenceIterator(0L, 19L), Types.LONG);
+        result =
+                DataStreamUtils.aggregate(
+                        dataStream, new TestAggregateFuncWithNonNeutralInitialAccumulator());
+        stringSumList = IteratorUtils.toList(result.executeAndCollect());
+        assertEquals(1, stringSumList.size());
+        String stringSum2 = stringSumList.get(0);
+
+        assertNotEquals(stringSum1, stringSum2);

Review Comment:
   Let's directly check the value of the output.



##########
flink-ml-core/src/test/java/org/apache/flink/ml/common/datastream/DataStreamUtilsTest.java:
##########
@@ -86,6 +87,29 @@ public void testAggregate() throws Exception {
         assertEquals("190", stringSum.get(0));
     }
 
+    @Test
+    public void testAggregateWithNonNeutralInitialAccumulator() throws Exception {
+        DataStream<Long> dataStream =
+                env.fromParallelCollection(new NumberSequenceIterator(0L, 19L), Types.LONG);
+        DataStream<String> result =
+                DataStreamUtils.aggregate(
+                        dataStream, new TestAggregateFuncWithNonNeutralInitialAccumulator());
+        List<String> stringSumList = IteratorUtils.toList(result.executeAndCollect());
+        assertEquals(1, stringSumList.size());
+        String stringSum1 = stringSumList.get(0);
+
+        env.setParallelism(env.getParallelism() + 1);

Review Comment:
   Let's reset the parallelism as the default value at the end of this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org