You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jiangxin369 (via GitHub)" <gi...@apache.org> on 2023/04/04 06:30:20 UTC

[GitHub] [flink-ml] jiangxin369 opened a new pull request, #229: [FLINK-31255] Wraps the operator config about serializer

jiangxin369 opened a new pull request, #229:
URL: https://github.com/apache/flink-ml/pull/229

   <!--
   *Thank you very much for contributing to Apache Flink ML - we are happy that you want to help us improve Flink ML. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to one [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] Title of the pull request`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Wraps the operator config about serializer.
   
   ## Brief change log
   
   *(for example:)*
     - Wraps the operator config about serializer.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not applicable)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1165403137


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Yes, the `typeSerializerIn` and `typeSerializerOut` should be `IterationRecordSerializer` in the current usage. But since we are providing a public utility method, I think we'd better check if the `serializer`s are not null and an instance of `IterationRecordSerializer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on PR #229:
URL: https://github.com/apache/flink-ml/pull/229#issuecomment-1512467383

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1168384091


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);

Review Comment:
   It's a good idea to rename `wrapperConfig` to `config`, but I don't think it matters to use `wrapperConfig` or `wrappedConfig`. Because the `wrappedConfig` is just a clone from `wrapperConfig` and we are getting the new configurations from the `wrapperConfig` and set to the `wrappedConfig`, just like the above code,
   ```
   KeySelector keySelector = config.getStatePartitioner(i, cl);
               if (keySelector != null) {
                   checkState(
                           keySelector instanceof ProxyKeySelector,
                           "The state partitioner for the wrapper operator should always be ProxyKeySelector, but it is "
                                   + keySelector);
                   wrappedConfig.setStatePartitioner(
                           i, ((ProxyKeySelector) keySelector).getWrappedKeySelector());
               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1168688436


##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   `BroadcastOutputTest.java` uses `BroadcastOutput` directly and `OperatorEpochWatermarkTrackerFactoryTest.java` uses `OperatorEpochWatermarkTracker` directly. But `OperatorUtilsTest` does not use `OperatorUtilsTest` directly. Thus it seems better to move this test to e.g. `UnboundedStreamIterationITCase`.



##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   `BroadcastOutputTest.java` uses `BroadcastOutput` directly and `OperatorEpochWatermarkTrackerFactoryTest.java` uses `OperatorEpochWatermarkTracker` directly. But `OperatorUtilsTest` does not use `OperatorUtilsTest` directly. 
   
   It seems better to move this test to e.g. `UnboundedStreamIterationITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "zhipeng93 (via GitHub)" <gi...@apache.org>.
zhipeng93 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1161461542


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   It seems that `typeSerializerIn` and `typeSerializerOut` should always be `IterationRecordSerializer` for wrapped iteration operators if they are not null.
   
   Is there any case that this condition does not hold?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {
+                    wrappedConfig.setInputs(
+                            new NetworkInputConfig(
+                                    ((IterationRecordSerializer<?>) typeSerializerIn)
+                                            .getInnerSerializer(),
+                                    i));
+                }
+            }
+        }
+
+        TypeSerializer<?> typeSerializerOut = wrapperConfig.getTypeSerializerOut(cl);
+        if (typeSerializerOut instanceof IterationRecordSerializer) {
+            wrappedConfig.setTypeSerializerOut(
+                    ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
+        }
+
+        TypeSerializer<?> stateKeySerializer = wrapperConfig.getStateKeySerializer(cl);
+        if (stateKeySerializer instanceof IterationRecordSerializer) {
+            wrappedConfig.setTypeSerializerOut(

Review Comment:
   It should be `wrappedConfig.setStateKeySerializer(...)` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1166360085


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);

Review Comment:
   Should we use `wrappedConfig` here?
   
   Maybe rename `wrapperConfig` to config to avoid so that it is easier to distinguish it from `wrappedConfig`.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {
+                    inputs[i] =
+                            new NetworkInputConfig(
+                                    ((IterationRecordSerializer<?>) typeSerializerIn)
+                                            .getInnerSerializer(),
+                                    i);
+                }
+            }
+        }
+        wrappedConfig.setInputs(inputs);
+
+        TypeSerializer<?> typeSerializerOut = wrapperConfig.getTypeSerializerOut(cl);
+        if (typeSerializerOut instanceof IterationRecordSerializer) {

Review Comment:
   Should we throw exception if `typeSerializerOut` is not `IterationRecordSerializer`?



##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {

Review Comment:
   Do we need both tests? Would it be simpler to have one test that uses both normal output and sideoutput?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {
+                    inputs[i] =
+                            new NetworkInputConfig(
+                                    ((IterationRecordSerializer<?>) typeSerializerIn)
+                                            .getInnerSerializer(),
+                                    i);
+                }
+            }
+        }
+        wrappedConfig.setInputs(inputs);
+
+        TypeSerializer<?> typeSerializerOut = wrapperConfig.getTypeSerializerOut(cl);
+        if (typeSerializerOut instanceof IterationRecordSerializer) {
+            wrappedConfig.setTypeSerializerOut(
+                    ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
+        }
+
+        Stream.concat(
+                        wrapperConfig.getChainedOutputs(cl).stream(),
+                        wrapperConfig.getNonChainedOutputs(cl).stream())
+                .forEach(
+                        edge -> {
+                            OutputTag<?> outputTag = edge.getOutputTag();
+                            if (outputTag != null) {
+                                TypeSerializer<?> typeSerializerSideOut =
+                                        wrapperConfig.getTypeSerializerSideOut(outputTag, cl);
+                                if (typeSerializerOut instanceof IterationRecordSerializer) {

Review Comment:
   Why do we check the `typeSerializerOut` here? Do you mean to check `typeSerializerSideOut`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Should we throw exception if the condition returns false?



##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   Can we additionally check the value of the output?
   
   Since the test actually starts a job in mini-cluster, would it be simpler to add the test to an test class in flink-ml-tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1168377341


##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   I added the check of the value of the output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 merged pull request #229: [FLINK-31255] OperatorUtils#createWrappedOperatorConfig should update input and sideOutput serializers

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 merged PR #229:
URL: https://github.com/apache/flink-ml/pull/229


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1169428050


##########
flink-ml-tests/src/test/java/org/apache/flink/test/iteration/UnboundedStreamIterationITCase.java:
##########
@@ -152,6 +160,39 @@ public void testVariableAndConstantBoundedIteration() throws Exception {
         assertEquals(OutputRecord.Event.TERMINATED, result.get().take().getEvent());
     }
 
+    @Test
+    public void testUnwrapOperatorConfig() throws Exception {

Review Comment:
   Can we name this test based on the key property of the API used by this test?
   
   For example, if the key property here is the use of SideOutput, maybe name it as `testBoundedIterationWithSideoutput`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1161619708


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Yes, you're right. I removed the  `if ... instanceOf ...` check. And seems that we did not modify the `StateKeySerializer` at all, so I also removed this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on PR #229:
URL: https://github.com/apache/flink-ml/pull/229#issuecomment-1499866189

   @zhipeng93  Could you help review this PR? And I have no idea why the workflow needs additional approval to run in this PR, could you help approve the workflow running?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on PR #229:
URL: https://github.com/apache/flink-ml/pull/229#issuecomment-1510974647

   @lindong28 Thanks for the review. I updated the PR, could you have another look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1168688436


##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   `BroadcastOutputTest.java` uses `BroadcastOutput` directly and `OperatorEpochWatermarkTrackerFactoryTest.java` uses `OperatorEpochWatermarkTracker` directly. But `OperatorUtilsTest` does not use `OperatorUtils` directly. 
   
   It seems better to move this test to e.g. `UnboundedStreamIterationITCase`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1168377341


##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   I added the check of the value of the output.
   
   I tried to move the test to flink-ml-tests, but it didn't make it simpler, so I prefer to leave the test in a directory that corresponds to the source code. Note that we also have some test classes running in a mini-cluster in the Iteration module, like `BroadcastOutputTest`, `OperatorEpochWatermarkTrackerFactoryTest`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1161619708


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Yes, you're right. I removed the  `if ... instanceOf ...` check. And seems that we did not modify the `StateKeySerializer` at all, so I also removed this part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1165403137


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Yes, the `typeSerializerIn` and `typeSerializerOut` should be `IterationRecordSerializer` in the current usage. But since we are providing a public utility method, I think we'd better check if the `serializer`s are not null and are instances of `IterationRecordSerializer`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on PR #229:
URL: https://github.com/apache/flink-ml/pull/229#issuecomment-1501660711

   @zhipeng93 Thanks for the review, I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "zhipeng93 (via GitHub)" <gi...@apache.org>.
zhipeng93 commented on PR #229:
URL: https://github.com/apache/flink-ml/pull/229#issuecomment-1506787052

   The update LGTM except the unresolved one. @lindong28 Would you like to take  a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

Posted by "zhipeng93 (via GitHub)" <gi...@apache.org>.
zhipeng93 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1162786211


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,24 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                inputs[i] =
+                        new NetworkInputConfig(
+                                ((IterationRecordSerializer<?>) typeSerializerIn)
+                                        .getInnerSerializer(),
+                                i);
+            }
+        }
+        wrappedConfig.setInputs(inputs);
+
+        TypeSerializer<?> typeSerializerOut = wrapperConfig.getTypeSerializerOut(cl);
+        wrappedConfig.setTypeSerializerOut(
+                ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
+

Review Comment:
   Let's also unwrap the `typeserializerSideOutput` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org