You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "lindong28 (via GitHub)" <gi...@apache.org> on 2023/04/14 07:01:36 UTC

[GitHub] [flink-ml] lindong28 commented on a diff in pull request #229: [FLINK-31255] Wraps the operator config about serializer

lindong28 commented on code in PR #229:
URL: https://github.com/apache/flink-ml/pull/229#discussion_r1166360085


##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);

Review Comment:
   Should we use `wrappedConfig` here?
   
   Maybe rename `wrapperConfig` to config to avoid so that it is easier to distinguish it from `wrappedConfig`.



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {
+                    inputs[i] =
+                            new NetworkInputConfig(
+                                    ((IterationRecordSerializer<?>) typeSerializerIn)
+                                            .getInnerSerializer(),
+                                    i);
+                }
+            }
+        }
+        wrappedConfig.setInputs(inputs);
+
+        TypeSerializer<?> typeSerializerOut = wrapperConfig.getTypeSerializerOut(cl);
+        if (typeSerializerOut instanceof IterationRecordSerializer) {

Review Comment:
   Should we throw exception if `typeSerializerOut` is not `IterationRecordSerializer`?



##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {

Review Comment:
   Do we need both tests? Would it be simpler to have one test that uses both normal output and sideoutput?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +110,50 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {
+                    inputs[i] =
+                            new NetworkInputConfig(
+                                    ((IterationRecordSerializer<?>) typeSerializerIn)
+                                            .getInnerSerializer(),
+                                    i);
+                }
+            }
+        }
+        wrappedConfig.setInputs(inputs);
+
+        TypeSerializer<?> typeSerializerOut = wrapperConfig.getTypeSerializerOut(cl);
+        if (typeSerializerOut instanceof IterationRecordSerializer) {
+            wrappedConfig.setTypeSerializerOut(
+                    ((IterationRecordSerializer<?>) typeSerializerOut).getInnerSerializer());
+        }
+
+        Stream.concat(
+                        wrapperConfig.getChainedOutputs(cl).stream(),
+                        wrapperConfig.getNonChainedOutputs(cl).stream())
+                .forEach(
+                        edge -> {
+                            OutputTag<?> outputTag = edge.getOutputTag();
+                            if (outputTag != null) {
+                                TypeSerializer<?> typeSerializerSideOut =
+                                        wrapperConfig.getTypeSerializerSideOut(outputTag, cl);
+                                if (typeSerializerOut instanceof IterationRecordSerializer) {

Review Comment:
   Why do we check the `typeSerializerOut` here? Do you mean to check `typeSerializerSideOut`?



##########
flink-ml-iteration/src/main/java/org/apache/flink/iteration/operator/OperatorUtils.java:
##########
@@ -104,6 +107,33 @@ public static StreamConfig createWrappedOperatorConfig(
             }
         }
 
+        StreamConfig.InputConfig[] inputs = wrapperConfig.getInputs(cl);
+        for (int i = 0; i < inputs.length; ++i) {
+            if (inputs[i] instanceof NetworkInputConfig) {
+                TypeSerializer<?> typeSerializerIn =
+                        ((NetworkInputConfig) inputs[i]).getTypeSerializer();
+                if (typeSerializerIn instanceof IterationRecordSerializer) {

Review Comment:
   Should we throw exception if the condition returns false?



##########
flink-ml-iteration/src/test/java/org/apache/flink/iteration/operator/OperatorUtilsTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.iteration.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.iteration.DataStreamList;
+import org.apache.flink.iteration.IterationBodyResult;
+import org.apache.flink.iteration.IterationRecord;
+import org.apache.flink.iteration.Iterations;
+import org.apache.flink.iteration.compile.DraftExecutionEnvironment;
+import org.apache.flink.iteration.operator.allround.AllRoundOperatorWrapper;
+import org.apache.flink.iteration.typeinfo.IterationRecordTypeInfo;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder;
+import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/** Tests the {@link OperatorUtils}. */
+public class OperatorUtilsTest extends TestLogger {
+
+    @Test
+    public void testCreateWrappedOperatorConfig() throws Exception {
+        StreamOperatorFactory<IterationRecord<Integer>> wrapperFactory =
+                new WrapperOperatorFactory<>(
+                        SimpleOperatorFactory.of(new MockTwoInputStreamOperator()),
+                        new AllRoundOperatorWrapper<>());
+
+        OperatorID operatorId = new OperatorID();
+
+        new StreamTaskMailboxTestHarnessBuilder<>(
+                        TwoInputStreamTask::new,
+                        new IterationRecordTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .addInput(new IterationRecordTypeInfo<>(BasicTypeInfo.INT_TYPE_INFO))
+                .setupOutputForSingletonOperatorChain(wrapperFactory, operatorId)
+                .build();
+    }
+
+    @Test
+    public void testCreateWrappedOperatorConfigWithSideOutput() throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.getConfig().enableObjectReuse();
+
+        final OutputTag<Integer> outputTag = new OutputTag("0", Types.INT) {};
+        DataStream<Integer> variableSource =
+                env.addSource(new DraftExecutionEnvironment.EmptySource<Integer>() {});
+
+        Iterations.iterateUnboundedStreams(
+                DataStreamList.of(variableSource),
+                DataStreamList.of(),
+                (variableStreams, dataStreams) -> {
+                    DataStream<Integer> variable = variableStreams.get(0);
+                    SingleOutputStreamOperator transformed =
+                            variable.transform(
+                                    "side-output",
+                                    Types.INT,
+                                    new MockSideOutputOperator<>(outputTag));
+                    return new IterationBodyResult(
+                            DataStreamList.of(variable),
+                            DataStreamList.of(transformed.getSideOutput(outputTag)));
+                });
+        env.execute();

Review Comment:
   Can we additionally check the value of the output?
   
   Since the test actually starts a job in mini-cluster, would it be simpler to add the test to an test class in flink-ml-tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org