You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "jiangxin369 (via GitHub)" <gi...@apache.org> on 2023/03/03 01:42:20 UTC

[GitHub] [flink-ml] jiangxin369 opened a new pull request, #218: [FLINK-31306] Add Servable for PipelineModel

jiangxin369 opened a new pull request, #218:
URL: https://github.com/apache/flink-ml/pull/218

   <!--
   *Thank you very much for contributing to Apache Flink ML - we are happy that you want to help us improve Flink ML. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to one [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.
     
     - Name the pull request in the form "[FLINK-XXXX] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number.
     Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] Title of the pull request`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each commit in the pull request has a meaningful commit message (including the JIRA id)
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   ## What is the purpose of the change
   
   Add Servable for PipelineModel.
   
   ## Brief change log
   
   *(for example:)*
     - Refactor code structure of utility classes.
     - Add class `Datatypes` to simplify the usage of data types.
     - Add `PipelineModelServable` to support chaining a sequence of Servables.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (JavaDocs)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133222401


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   The `testPipelineModel` and `testPipeline` are testing the whole lifecycle of `Pipeline` and `PipelineModel`, while this method is not for testing `PipelineModelServable`, it only tests the `loadServable` function of `PipelineModel`. The `PipelineModelServable` is tested in `PipelineModelServableTest.java`. So I think it is better to keep it `testLoadServable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] zhipeng93 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "zhipeng93 (via GitHub)" <gi...@apache.org>.
zhipeng93 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133434653


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   I aggree with @lindong28 that using `PipelineModel#loadServable()` is better for readability. 
   
   However, I also noticed that we are using refelctions to test `saveAndReload` for all other Model/Transformers (e.g., KmeansTest [1]). In this case, using reflections for testing `PipelineModel#loadServable()` here is ok to me. If we want better readability, we can update all the existing tests in a later PR.
   
   @jiangxin369 @lindong28 What do you think?
   
   
   [1] https://github.com/apache/flink-ml/blob/master/flink-ml-lib/src/test/java/org/apache/flink/ml/clustering/KMeansTest.java#L217 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133514787


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   I agree we can also update the existing `TestUtils#saveAndReload` to make the code more readable.
   
   This can be achieved by passing e.g. `KMeans#load` as parameter of type `BiFunctionWithException<StreamTableEnvironment, String, T, IOException>` to `TestUtils#saveAndReload`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1126126121


##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/servable/types/DataTypes.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.types;
+
+/** This class gives access to the most common types that are used to define DataFrames. */
+public class DataTypes {
+
+    public static final ScalarType BOOLEAN = new ScalarType(BasicType.BOOLEAN);
+
+    public static final ScalarType BYTE = new ScalarType(BasicType.BYTE);
+
+    public static final ScalarType SHORT = new ScalarType(BasicType.SHORT);
+
+    public static final ScalarType INT = new ScalarType(BasicType.INT);
+
+    public static final ScalarType LONG = new ScalarType(BasicType.LONG);
+
+    public static final ScalarType FLOAT = new ScalarType(BasicType.FLOAT);
+
+    public static final ScalarType DOUBLE = new ScalarType(BasicType.DOUBLE);
+
+    public static final ScalarType STRING = new ScalarType(BasicType.STRING);
+
+    public static final ScalarType BYTE_STRING = new ScalarType(BasicType.BYTE_STRING);
+
+    public static VectorType VECTOR(BasicType elementType) {

Review Comment:
   I name this with `VECTOR` referring to Types[1] in Flink, IMO it may be more familiar to users.
   
   [1] https://github.com/apache/flink/blob/84f532e65498164bc03529dc387a852f0e18d31d/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/Types.java#L212



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133223131


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   Compared with the disadvantage that users cannot tell the test method directly, I'd rather keep the code clean by wrapping the common logic in a util method, like `TestUtils#saveAndReload`. I believe a developer should notice that the `loadServable` is tested in `testLoadServable` by method name.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1130375623


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/ExampleStages.java:
##########
@@ -110,6 +111,10 @@ public static SumModel load(StreamTableEnvironment tEnv, String path) throws IOE
             SumModel model = ReadWriteUtils.loadStageParam(path);
             return model.setModelData(modelDataTable);
         }
+
+        public static SumModelServable loadServable(String path) throws IOException {

Review Comment:
   Can we add a test this method directly (rather than via reflection) similar to the existing `ReadWriteUtilsTest#testModelSaveLoad`? This would allow developers to quickly identify where this method is tested via IDE. 
   
   And it is also more consistent with the idea that we want to test every public API in the same way as how users are going to use it.



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+    /**
+     * A {@link TransformerServable} subclass that increments every value in the input dataframe by
+     * `delta` and outputs the resulting values.
+     */
+    public static class SumModelServable implements ModelServable<SumModelServable> {
+
+        private static final String COL_NAME = "input";
+
+        private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+        private int delta;
+
+        public SumModelServable() {
+            ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+        }
+
+        @Override
+        public DataFrame transform(DataFrame input) {
+            List<Row> outputRows = new ArrayList<>();
+            for (Row row : input.collect()) {
+                assert row.size() == 1;
+                int originValue = (Integer) row.get(0);
+                outputRows.add(new Row(Collections.singletonList(originValue + delta)));
+            }
+            return new DataFrame(
+                    Collections.singletonList(COL_NAME),
+                    Collections.singletonList(DataTypes.INT),
+                    outputRows);
+        }
+
+        @Override
+        public Map<Param<?>, Object> getParamMap() {
+            return paramMap;
+        }
+
+        public static SumModelServable load(String path) throws IOException {
+            SumModelServable servable =
+                    ServableReadWriteUtils.loadServableParam(path, SumModelServable.class);
+
+            try (InputStream inputStream = ServableReadWriteUtils.loadModelData(path)) {
+                DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                        new DataInputViewStreamWrapper(inputStream);
+                int delta = IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);

Review Comment:
   Would it be simpler to do the following and remove the method `setModelData()`?
   
   ```
   servable.delta = IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
   return servable;
   ```



##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/util/ServableReadWriteUtils.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.builder.PipelineModelServable;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.ml.util.FileUtils.loadMetadata;
+
+/** Utility methods for loading Servables. */
+public class ServableReadWriteUtils {
+
+    /**
+     * Loads the servables of a {@link PipelineModelServable} from the given path.
+     *
+     * <p>The method throws RuntimeException if the expectedClassName is not empty AND it does not
+     * match the className of the previously saved PipelineModel.
+     *
+     * @param path The parent directory to load the PipelineModelServable metadata and its
+     *     servables.
+     * @return A list of servables.
+     */
+    public static List<TransformerServable<?>> loadPipelineOfServables(String path)

Review Comment:
   How about naming it `loadPipeline` for consistency with `ReadWriteUtils#loadPipeline`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+
+    private static final DataFrame INPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(1)),
+                            new Row(Collections.singletonList(2)),
+                            new Row(Collections.singletonList(3))));
+
+    private static final DataFrame EXPECTED_OUTPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(11)),
+                            new Row(Collections.singletonList(12)),
+                            new Row(Collections.singletonList(13))));
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    public void before() {
+        StreamExecutionEnvironment env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSaveModelAndLoadAsServable() throws Exception {

Review Comment:
   Would it be simpler to use `testSaveModelLoadServable()`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+
+    private static final DataFrame INPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(1)),
+                            new Row(Collections.singletonList(2)),
+                            new Row(Collections.singletonList(3))));
+
+    private static final DataFrame EXPECTED_OUTPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(11)),
+                            new Row(Collections.singletonList(12)),
+                            new Row(Collections.singletonList(13))));
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    public void before() {
+        StreamExecutionEnvironment env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSaveModelAndLoadAsServable() throws Exception {
+        SumModel model = new SumModel().setModelData(tEnv.fromValues(10));
+
+        SumModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());
+
+        DataFrame output = servable.transform(INPUT);
+
+        compareDataFrame(EXPECTED_OUTPUT, output);
+    }
+
+    @Test
+    public void testGetModelDataAndSetToServable() throws Exception {

Review Comment:
   Would it be simpler to name it `testSetModelData()`?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+
+    private static final DataFrame INPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(1)),
+                            new Row(Collections.singletonList(2)),
+                            new Row(Collections.singletonList(3))));
+
+    private static final DataFrame EXPECTED_OUTPUT =
+            new DataFrame(
+                    Collections.singletonList("input"),
+                    Collections.singletonList(DataTypes.INT),
+                    Arrays.asList(
+                            new Row(Collections.singletonList(11)),
+                            new Row(Collections.singletonList(12)),
+                            new Row(Collections.singletonList(13))));
+
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+    @Before
+    public void before() {
+        StreamExecutionEnvironment env = TestUtils.getExecutionEnvironment();
+        tEnv = StreamTableEnvironment.create(env);
+    }
+
+    @Test
+    public void testSaveModelAndLoadAsServable() throws Exception {
+        SumModel model = new SumModel().setModelData(tEnv.fromValues(10));
+
+        SumModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());
+
+        DataFrame output = servable.transform(INPUT);
+
+        compareDataFrame(EXPECTED_OUTPUT, output);
+    }
+
+    @Test
+    public void testGetModelDataAndSetToServable() throws Exception {
+        SumModel model = new SumModel().setModelData(tEnv.fromValues(10));
+        Table modelDataTable = model.getModelData()[0];
+        Integer modelData =
+                tEnv.toDataStream(modelDataTable)
+                        .map(x -> (int) x.getField(0))
+                        .executeAndCollect()
+                        .next();
+
+        byte[] serializedModelData = Serializer.serialize(modelData);

Review Comment:
   Serializing the model data as object stream is not very efficient, similar to the serialization using kryo.  And we don't expect user code to be directly aware of the internal data structure of the model data.
   
   It would be useful for tests to show the best practice of developing and using servable.
   
   How is one possible way to convert the a Table from `Model#getModelData` to an InputStream that can be provided to `ModelServable#setModelData`.
   
   1. Table of model data object (in this case model object is an integer)
   2. Table of byte[]
   3. List of bytes[]
   4. InputStream of bytes[]
   
   
   
   
   



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {

Review Comment:
   Would it be simpler to name it either `ServableTest`?



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.ModelServable;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+    /**
+     * A {@link TransformerServable} subclass that increments every value in the input dataframe by
+     * `delta` and outputs the resulting values.
+     */
+    public static class SumModelServable implements ModelServable<SumModelServable> {
+
+        private static final String COL_NAME = "input";
+
+        private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+        private int delta;
+
+        public SumModelServable() {
+            ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+        }
+
+        @Override
+        public DataFrame transform(DataFrame input) {
+            List<Row> outputRows = new ArrayList<>();
+            for (Row row : input.collect()) {
+                assert row.size() == 1;
+                int originValue = (Integer) row.get(0);
+                outputRows.add(new Row(Collections.singletonList(originValue + delta)));
+            }
+            return new DataFrame(
+                    Collections.singletonList(COL_NAME),
+                    Collections.singletonList(DataTypes.INT),
+                    outputRows);
+        }
+
+        @Override
+        public Map<Param<?>, Object> getParamMap() {
+            return paramMap;
+        }
+
+        public static SumModelServable load(String path) throws IOException {
+            SumModelServable servable =
+                    ServableReadWriteUtils.loadServableParam(path, SumModelServable.class);
+
+            try (InputStream inputStream = ServableReadWriteUtils.loadModelData(path)) {
+                DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                        new DataInputViewStreamWrapper(inputStream);
+                int delta = IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
+                return servable.setDelta(delta);
+            }
+        }
+
+        public SumModelServable setDelta(int delta) {
+            this.delta = delta;
+            return this;
+        }
+
+        public SumModelServable setModelData(InputStream... modelDataInputs) throws IOException {
+            assert modelDataInputs.length == 1;

Review Comment:
   Would the following code be more consistent with the existing way of checking argument length?
   
   ```
   Preconditions.checkArgument(inputs.length == 1);
   ```



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/TestUtils.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable;
+
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Utility methods for tests. */
+public class TestUtils {
+
+    /** Compares two dataframes. */
+    public static void compareDataFrame(DataFrame first, DataFrame second) {

Review Comment:
   Would it be simpler to name it `compare(...)`?
   
   Should this method return `int` for consistency with other `compare` methods?
   
   If this method throw exception when the given data frames are not equal, it seems better to name it `assertEquals(...)`.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133042094


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   nits: can we name it `testPipelineModelServable` for consistency with `testPipelineModel` and `testPipeline` in this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1126103060


##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/util/FileUtils.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Map;
+
+/** Utility methods for file operations. */
+public class FileUtils {
+
+    /** Saves a given string to the specified file. */
+    public static void saveToFile(String pathStr, String content, boolean isOverwrite)
+            throws IOException {
+        Path path = new Path(pathStr);
+
+        // Creates parent directories if not already created.
+        FileSystem fs = mkdirs(path.getParent());
+
+        FileSystem.WriteMode writeMode = FileSystem.WriteMode.OVERWRITE;
+        if (!isOverwrite) {
+            writeMode = FileSystem.WriteMode.NO_OVERWRITE;
+            if (fs.exists(path)) {
+                throw new IOException("File " + path + " already exists.");
+            }
+        }
+        try (BufferedWriter writer =
+                new BufferedWriter(new OutputStreamWriter(fs.create(path, writeMode)))) {
+            writer.write(content);
+        }
+    }
+
+    public static FileSystem mkdirs(Path path) throws IOException {
+        FileSystem fs = path.getFileSystem();
+        fs.mkdirs(path);
+        return fs;
+    }
+
+    /**
+     * Loads the metadata from the metadata file under the given path.
+     *
+     * <p>The method throws RuntimeException if the expectedClassName is not empty AND it does not
+     * match the className of the previously saved stage.
+     *
+     * @param path The parent directory of the metadata file to read from.
+     * @param expectedClassName The expected class name of the stage.
+     * @return A map from metadata name to metadata value.
+     */
+    public static Map<String, ?> loadMetadata(String path, String expectedClassName)
+            throws IOException {
+        Path metadataPath = new Path(path, "metadata");
+        FileSystem fs = metadataPath.getFileSystem();
+
+        StringBuilder buffer = new StringBuilder();
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(metadataPath)))) {
+            String line;
+            while ((line = br.readLine()) != null) {
+                if (!line.startsWith("#")) {
+                    buffer.append(line);
+                }
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        Map<String, ?> result = JsonUtils.OBJECT_MAPPER.readValue(buffer.toString(), Map.class);
+
+        String className = (String) result.get("className");
+        if (!expectedClassName.isEmpty() && !expectedClassName.equals(className)) {
+            throw new RuntimeException(
+                    "Class name "
+                            + className
+                            + " does not match the expected class name "
+                            + expectedClassName
+                            + ".");
+        }
+
+        return result;
+    }
+
+    // Returns a string with value {parentPath}/stages/{stageIdx}, where the stageIdx is prefixed
+    // with zero or more `0` to have the same length as numStages. The resulting string can be
+    // used as the directory to save a stage of the Pipeline or PipelineModel.
+    public static String getPathForPipelineStage(int stageIdx, int numStages, String parentPath) {
+        String format =
+                String.format("stages%s%%0%dd", File.separator, String.valueOf(numStages).length());
+        String fileName = String.format(format, stageIdx);
+        return new Path(parentPath, fileName).toString();
+    }
+
+    /** Returns a subdirectory of the given path for saving/loading model data. */
+    public static Path getDataPath(String path) {
+        return new Path(path, "data");
+    }
+
+    /**
+     * Opens an FSDataInputStream to read the model data file in the directory. Only one model data
+     * file is expected to be in the directory.
+     *
+     * @param path The parent directory of the model data file.
+     * @return A FSDataInputStream to read the model data.
+     */
+    public static FSDataInputStream getModelDataInputStream(Path path) throws IOException {

Review Comment:
   Yes, you are right. Besides, I moved this function to `ServableReadWriteUtils.java` because I expect that all util methods that are required for implementing an algorithm are located in `ServableReadWriteUtils.java`, the `FileUtils.java` is only for the framework.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on PR #218:
URL: https://github.com/apache/flink-ml/pull/218#issuecomment-1463393272

   @lindong28 Thanks for the review, I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133237792


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   I am not sure developers can "easily" notice that this `PipelineModel#loadServable` is tested in `PipelineTest#testLoadServable`. IMO it is much easier for developers to know where this method is tested via IDE if this method is directly invoked here.
   
   Maybe we can involve a third person to make the choice. @zhipeng93 can you comment on which solution you prefer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1126151088


##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.FileUtils;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+    /**
+     * A {@link TransformerServable} subclass that increments every value in the input dataframe by
+     * `delta` and outputs the resulting values.
+     */
+    public static class SumModelServable implements TransformerServable<SumModelServable> {
+
+        private static final String COL_NAME = "input";
+
+        private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+        private int delta;
+
+        public SumModelServable() {
+            ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+        }
+
+        @Override
+        public DataFrame transform(DataFrame input) {
+            List<Row> outputRows = new ArrayList<>();
+            for (Row row : input.collect()) {
+                assert row.size() == 1;
+                int originValue = (Integer) row.get(0);
+                outputRows.add(new Row(Collections.singletonList(originValue + delta)));
+            }
+            return new DataFrame(
+                    Collections.singletonList(COL_NAME),
+                    Collections.singletonList(DataTypes.INT),
+                    outputRows);
+        }
+
+        @Override
+        public Map<Param<?>, Object> getParamMap() {
+            return paramMap;
+        }
+
+        public static SumModelServable load(String path) throws IOException {
+            SumModelServable servable =
+                    ServableReadWriteUtils.loadServableParam(path, SumModelServable.class);
+
+            Path modelDataPath = FileUtils.getDataPath(path);
+            try (FSDataInputStream fsDataInputStream =
+                    FileUtils.getModelDataInputStream(modelDataPath)) {
+                DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                        new DataInputViewStreamWrapper(fsDataInputStream);
+                int delta = IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
+                servable.setDelta(delta);
+            }
+            return servable;
+        }
+
+        public SumModelServable setDelta(int delta) {

Review Comment:
   I'm not sure if I got your point. I add `setDelta` method just to construct multiple different `Transformer`s and chain them into a pipeline, then perform the `saveLoadAndTransform` tests. It is simpler with a setter rather than a `setModelData`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1130430131


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/IntegrateWithServableTest.java:
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.api;
+
+import org.apache.flink.ml.api.ExampleStages.SumModel;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.builder.ExampleServables.SumModelServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.Serializer;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.flink.ml.servable.TestUtils.compareDataFrame;
+import static org.apache.flink.ml.util.TestUtils.saveAndLoadServable;
+
+/** Tests the behavior of integration with Servables. */
+public class IntegrateWithServableTest extends AbstractTestBase {

Review Comment:
   Would it be simpler to name it `ServableTest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133237792


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   I am not sure developers can "easily" notice that this `PipelineModel#loadServable` is tested in `PipelineTest#testLoadServable`. IMO it is much easier for developers to know where this method is tested via IDE if this method is directly invoked here.
   
   Maybe we can let a third person break the tie. @zhipeng93 can you comment on which solution you prefer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 merged pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 merged PR #218:
URL: https://github.com/apache/flink-ml/pull/218


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133223131


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   Compared with the disadvantage that users cannot tell the test method directly, I'd rather keep the code clean by wrapping the common logic in a util method, like `TestUtils#saveAndReload`. I believe a developer should notice that the `loadServable` is tested in `testLoadServable` by method name, how do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133364498


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   I agree with you that this test covers all lifecycle of `PipelineModelServable` and can be renamed to `testPipelineModelServable`. As for `PipelineModelServableTest.java`, it tests the `PipelineModelServable` itself without reloading from `PipelineModel`, so I think we can keep it. A similar case is that we have both `testTransform` and `testSaveLoadAndTransform` for each Transformers in flink-ml-lib.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133222401


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   The `testPipelineModel` and `testPipeline` are testing the whole lifecycle of `Pipeline` and `PipelineModel`, while this method is not for testing `PipelineModelServable`, it tests the `loadServable` function of `PipelineModel`. The `PipelineModelServable` is tested in `PipelineModelServableTest.java`. So I think it is better to keep it `testLoadServable`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1126115803


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/ExampleStages.java:
##########
@@ -110,6 +111,10 @@ public static SumModel load(StreamTableEnvironment tEnv, String path) throws IOE
             SumModel model = ReadWriteUtils.loadStageParam(path);
             return model.setModelData(modelDataTable);
         }
+
+        public static SumModelServable loadServable(String path) throws IOException {

Review Comment:
   I'm afraid not, the `SumModel` is just for testing `PipelineModelServable`, and the `loadServable` function is already covered in `PipelineTest#testLoadServable`. But we would add such tests for every algorithm servable, e.g. LogisticRegressionModelServable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133589007


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   I agree with the solution and I've updated the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1125428753


##########
flink-ml-core/src/main/java/org/apache/flink/ml/builder/PipelineModel.java:
##########
@@ -82,6 +85,33 @@ public static PipelineModel load(StreamTableEnvironment tEnv, String path) throw
                 ReadWriteUtils.loadPipeline(tEnv, path, PipelineModel.class.getName()));
     }
 
+    public static PipelineModelServable loadServable(String path) throws IOException {
+        return PipelineModelServable.load(path);
+    }
+
+    /**
+     * Whether all stages in the pipeline have corresponding {@link TransformerServable} so that the
+     * PipelineModel can be turned into a TransformerServable and used in an online inference
+     * program.
+     *
+     * @return true if all stages have corresponding TransformerServable, false if not.
+     */
+    public boolean supportServable() {
+        for (Stage stage : stages) {

Review Comment:
   nits: `Stage stage : stages` -> `Stage<?> stage : stages`



##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/builder/ExampleServables.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.builder;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.types.DataTypes;
+import org.apache.flink.ml.util.FileUtils;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ServableReadWriteUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Defines Servable subclasses to be used in unit tests. */
+public class ExampleServables {
+
+    /**
+     * A {@link TransformerServable} subclass that increments every value in the input dataframe by
+     * `delta` and outputs the resulting values.
+     */
+    public static class SumModelServable implements TransformerServable<SumModelServable> {
+
+        private static final String COL_NAME = "input";
+
+        private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+        private int delta;
+
+        public SumModelServable() {
+            ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+        }
+
+        @Override
+        public DataFrame transform(DataFrame input) {
+            List<Row> outputRows = new ArrayList<>();
+            for (Row row : input.collect()) {
+                assert row.size() == 1;
+                int originValue = (Integer) row.get(0);
+                outputRows.add(new Row(Collections.singletonList(originValue + delta)));
+            }
+            return new DataFrame(
+                    Collections.singletonList(COL_NAME),
+                    Collections.singletonList(DataTypes.INT),
+                    outputRows);
+        }
+
+        @Override
+        public Map<Param<?>, Object> getParamMap() {
+            return paramMap;
+        }
+
+        public static SumModelServable load(String path) throws IOException {
+            SumModelServable servable =
+                    ServableReadWriteUtils.loadServableParam(path, SumModelServable.class);
+
+            Path modelDataPath = FileUtils.getDataPath(path);
+            try (FSDataInputStream fsDataInputStream =
+                    FileUtils.getModelDataInputStream(modelDataPath)) {
+                DataInputViewStreamWrapper dataInputViewStreamWrapper =
+                        new DataInputViewStreamWrapper(fsDataInputStream);
+                int delta = IntSerializer.INSTANCE.deserialize(dataInputViewStreamWrapper);
+                servable.setDelta(delta);
+            }
+            return servable;
+        }
+
+        public SumModelServable setDelta(int delta) {

Review Comment:
   Should we replace this method with `SumModelServable#setModelData(...)` so that users can set the model data using the public API designed in FLIP-289?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/ExampleStages.java:
##########
@@ -110,6 +111,10 @@ public static SumModel load(StreamTableEnvironment tEnv, String path) throws IOE
             SumModel model = ReadWriteUtils.loadStageParam(path);
             return model.setModelData(modelDataTable);
         }
+
+        public static SumModelServable loadServable(String path) throws IOException {

Review Comment:
   Would it be useful to add a test similar to the following code snippet to cover this method?
   
   ```
   SumModel model = ...;
   model.save(path);
   SumModelServable servable = SumModelServable.load(path);
   Assert.assertEquals(expected, servable.transform(...));
   ```



##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/util/FileUtils.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Preconditions;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.util.Map;
+
+/** Utility methods for file operations. */
+public class FileUtils {
+
+    /** Saves a given string to the specified file. */
+    public static void saveToFile(String pathStr, String content, boolean isOverwrite)
+            throws IOException {
+        Path path = new Path(pathStr);
+
+        // Creates parent directories if not already created.
+        FileSystem fs = mkdirs(path.getParent());
+
+        FileSystem.WriteMode writeMode = FileSystem.WriteMode.OVERWRITE;
+        if (!isOverwrite) {
+            writeMode = FileSystem.WriteMode.NO_OVERWRITE;
+            if (fs.exists(path)) {
+                throw new IOException("File " + path + " already exists.");
+            }
+        }
+        try (BufferedWriter writer =
+                new BufferedWriter(new OutputStreamWriter(fs.create(path, writeMode)))) {
+            writer.write(content);
+        }
+    }
+
+    public static FileSystem mkdirs(Path path) throws IOException {
+        FileSystem fs = path.getFileSystem();
+        fs.mkdirs(path);
+        return fs;
+    }
+
+    /**
+     * Loads the metadata from the metadata file under the given path.
+     *
+     * <p>The method throws RuntimeException if the expectedClassName is not empty AND it does not
+     * match the className of the previously saved stage.
+     *
+     * @param path The parent directory of the metadata file to read from.
+     * @param expectedClassName The expected class name of the stage.
+     * @return A map from metadata name to metadata value.
+     */
+    public static Map<String, ?> loadMetadata(String path, String expectedClassName)
+            throws IOException {
+        Path metadataPath = new Path(path, "metadata");
+        FileSystem fs = metadataPath.getFileSystem();
+
+        StringBuilder buffer = new StringBuilder();
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(metadataPath)))) {
+            String line;
+            while ((line = br.readLine()) != null) {
+                if (!line.startsWith("#")) {
+                    buffer.append(line);
+                }
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        Map<String, ?> result = JsonUtils.OBJECT_MAPPER.readValue(buffer.toString(), Map.class);
+
+        String className = (String) result.get("className");
+        if (!expectedClassName.isEmpty() && !expectedClassName.equals(className)) {
+            throw new RuntimeException(
+                    "Class name "
+                            + className
+                            + " does not match the expected class name "
+                            + expectedClassName
+                            + ".");
+        }
+
+        return result;
+    }
+
+    // Returns a string with value {parentPath}/stages/{stageIdx}, where the stageIdx is prefixed
+    // with zero or more `0` to have the same length as numStages. The resulting string can be
+    // used as the directory to save a stage of the Pipeline or PipelineModel.
+    public static String getPathForPipelineStage(int stageIdx, int numStages, String parentPath) {
+        String format =
+                String.format("stages%s%%0%dd", File.separator, String.valueOf(numStages).length());
+        String fileName = String.format(format, stageIdx);
+        return new Path(parentPath, fileName).toString();
+    }
+
+    /** Returns a subdirectory of the given path for saving/loading model data. */
+    public static Path getDataPath(String path) {
+        return new Path(path, "data");
+    }
+
+    /**
+     * Opens an FSDataInputStream to read the model data file in the directory. Only one model data
+     * file is expected to be in the directory.
+     *
+     * @param path The parent directory of the model data file.
+     * @return A FSDataInputStream to read the model data.
+     */
+    public static FSDataInputStream getModelDataInputStream(Path path) throws IOException {

Review Comment:
   Would the following function signature be simpler to use and more consistent with the existing `ReadWriteUtils#loadModelData`?
   
   ```
   public static InputStream loadModelData(String path) throws IOException
   ```



##########
flink-ml-core/src/main/java/org/apache/flink/ml/builder/PipelineModel.java:
##########
@@ -82,6 +85,33 @@ public static PipelineModel load(StreamTableEnvironment tEnv, String path) throw
                 ReadWriteUtils.loadPipeline(tEnv, path, PipelineModel.class.getName()));
     }
 
+    public static PipelineModelServable loadServable(String path) throws IOException {
+        return PipelineModelServable.load(path);
+    }
+
+    /**
+     * Whether all stages in the pipeline have corresponding {@link TransformerServable} so that the
+     * PipelineModel can be turned into a TransformerServable and used in an online inference
+     * program.
+     *
+     * @return true if all stages have corresponding TransformerServable, false if not.
+     */
+    public boolean supportServable() {
+        for (Stage stage : stages) {
+            if (!(stage instanceof Transformer)) {
+                return false;
+            }
+            Transformer transformer = (Transformer) stage;

Review Comment:
   nits: `Transformer<?> transformer = (Transformer<?>) stage`



##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/servable/types/DataTypes.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable.types;
+
+/** This class gives access to the most common types that are used to define DataFrames. */
+public class DataTypes {
+
+    public static final ScalarType BOOLEAN = new ScalarType(BasicType.BOOLEAN);
+
+    public static final ScalarType BYTE = new ScalarType(BasicType.BYTE);
+
+    public static final ScalarType SHORT = new ScalarType(BasicType.SHORT);
+
+    public static final ScalarType INT = new ScalarType(BasicType.INT);
+
+    public static final ScalarType LONG = new ScalarType(BasicType.LONG);
+
+    public static final ScalarType FLOAT = new ScalarType(BasicType.FLOAT);
+
+    public static final ScalarType DOUBLE = new ScalarType(BasicType.DOUBLE);
+
+    public static final ScalarType STRING = new ScalarType(BasicType.STRING);
+
+    public static final ScalarType BYTE_STRING = new ScalarType(BasicType.BYTE_STRING);
+
+    public static VectorType VECTOR(BasicType elementType) {

Review Comment:
   Should we name this method `vector(...)` to follow the existing code style guideline?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on PR #218:
URL: https://github.com/apache/flink-ml/pull/218#issuecomment-1465977728

   @lindong28 Thanks for the review, I've updated the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133042094


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   nits: can we name it `PipelineModelServable` for consistency with `testPipelineModel` and `testPipeline` in this class?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {
+        SumModel modelA = new SumModel().setModelData(tEnv.fromValues(10));
+        SumModel modelB = new SumModel().setModelData(tEnv.fromValues(20));
+        SumModel modelC = new SumModel().setModelData(tEnv.fromValues(30));
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB, modelC);
+        Model<?> model = new PipelineModel(stages);
+
+        PipelineModelServable servable =
+                saveAndLoadServable(tEnv, model, tempFolder.newFolder().getAbsolutePath());

Review Comment:
   nits: can we call `PipelineModel#loadServable()` here directly so that developers can tell where this public API is tested via IDE?



##########
flink-ml-servable-core/src/main/java/org/apache/flink/ml/util/ServableReadWriteUtils.java:
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.util;
+
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.servable.api.TransformerServable;
+import org.apache.flink.ml.servable.builder.PipelineModelServable;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.ml.util.FileUtils.loadMetadata;
+
+/** Utility methods for loading Servables. */
+public class ServableReadWriteUtils {
+
+    /**
+     * Loads the servables of a {@link PipelineModelServable} from the given path.
+     *
+     * <p>The method throws RuntimeException if the expectedClassName is not empty AND it does not
+     * match the className of the previously saved PipelineModel.
+     *
+     * @param path The parent directory to load the PipelineModelServable metadata and its
+     *     servables.
+     * @return A list of servables.
+     */
+    public static List<TransformerServable<?>> loadPipeline(String path) throws IOException {
+        Map<String, ?> metadata = loadMetadata(path, "");
+        int numStages = (Integer) metadata.get("numStages");
+        List<TransformerServable<?>> servables = new ArrayList<>(numStages);
+
+        for (int i = 0; i < numStages; i++) {
+            String stagePath = FileUtils.getPathForPipelineStage(i, numStages, path);
+            servables.add(loadServable(stagePath));
+        }
+        return servables;
+    }
+
+    /**
+     * Loads the {@link TransformerServable} from the given path by invoking the static
+     * loadServable() method of the stage. The stage class name is read from the metadata file under
+     * the given path. The loadServable() method is expected to construct the TransformerServable
+     * instance with the saved parameters, model data and other metadata if exists.
+     *
+     * <p>Required: the stage class must have a static loadServable() method.
+     *
+     * @param path The parent directory of the stage metadata file.
+     * @return An instance of {@link TransformerServable}.
+     */
+    private static TransformerServable<?> loadServable(String path) throws IOException {
+        Map<String, ?> metadata = FileUtils.loadMetadata(path, "");
+        String className = (String) metadata.get("className");
+
+        try {
+            Class<?> clazz = Class.forName(className);
+            Method method = clazz.getMethod("loadServable", String.class);
+            method.setAccessible(true);
+            return (TransformerServable<?>) method.invoke(null, path);
+        } catch (NoSuchMethodException e) {
+            String methodName = String.format("%s::loadServable(String)", className);
+            throw new RuntimeException(
+                    "Failed to load servable because the static method "
+                            + methodName
+                            + " is not implemented.",
+                    e);
+        } catch (ClassNotFoundException | IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException("Failed to load servable.", e);
+        }
+    }
+
+    /**
+     * Loads the {@link TransformerServable} with the saved parameters from the given path. This
+     * method reads the metadata file under the given path, instantiates the servable using its
+     * no-argument constructor, and loads the servable with the paramMap from the metadata file.
+     *
+     * <p>Note: This method does not attempt to read model data from the given path. Caller needs to
+     * read and deserialize model data from the given path.
+     *
+     * <p>Required: the class with type T must have a no-argument constructor.
+     *
+     * @param path The parent directory of the metadata file.
+     * @param <T> The class type of the TransformerServable subclass.
+     * @return An instance of class type T.
+     */
+    public static <T extends TransformerServable<T>> T loadServableParam(
+            String path, Class<T> clazz) throws IOException {
+        T instance = InstantiationUtil.instantiate(clazz);
+
+        Map<String, Param<?>> nameToParam = new HashMap<>();
+        for (Param<?> param : ParamUtils.getPublicFinalParamFields(instance)) {
+            nameToParam.put(param.name, param);
+        }
+
+        Map<String, ?> jsonMap = loadMetadata(path, "");
+        if (jsonMap.containsKey("paramMap")) {
+            Map<String, Object> paramMap = (Map<String, Object>) jsonMap.get("paramMap");
+            for (Map.Entry<String, Object> entry : paramMap.entrySet()) {
+                Param<?> param = nameToParam.get(entry.getKey());
+                ParamUtils.setParam(instance, param, param.jsonDecode(entry.getValue()));
+            }
+        }
+
+        return instance;
+    }
+
+    /**
+     * Opens an FSDataInputStream to read the model data file in the directory. Only one model data
+     * file is expected to be in the directory.
+     *
+     * @param path The parent directory of the model data file.
+     * @return A FSDataInputStream to read the model data.
+     */
+    public static InputStream loadModelData(String path) throws IOException {
+

Review Comment:
   nits: remove the empty line here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133236486


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   This test also covers the method `PipelineModelServable#transform` in addition to `PipelineModel#loadServable`. Which parts of the `PipelineModelServable` lifecycle is not tested?



##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   This test also covers the method `PipelineModelServable#transform` in addition to `PipelineModel#loadServable`. Which part of the `PipelineModelServable` lifecycle is not tested?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1133236486


##########
flink-ml-core/src/test/java/org/apache/flink/ml/api/PipelineTest.java:
##########
@@ -95,4 +109,59 @@ public void testPipeline() throws Exception {
         // Executes the loaded Pipeline and verifies that it produces the expected output.
         TestUtils.executeAndCheckOutput(env, loadedEstimator, inputs, output, null, null);
     }
+
+    @Test
+    public void testSupportServable() {
+        SumEstimator estimatorA = new SumEstimator();
+        UnionAlgoOperator algoOperatorA = new UnionAlgoOperator();
+        SumModel modelA = new SumModel();
+        SumModel modelB = new SumModel();
+
+        List<Stage<?>> stages = Arrays.asList(modelA, modelB);
+        PipelineModel pipelineModel = new PipelineModel(stages);
+        assertTrue(pipelineModel.supportServable());
+
+        stages = Arrays.asList(estimatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+
+        stages = Arrays.asList(algoOperatorA, modelA);
+        pipelineModel = new PipelineModel(stages);
+        assertFalse(pipelineModel.supportServable());
+    }
+
+    @Test
+    public void testLoadServable() throws Exception {

Review Comment:
   This test also covers the method `PipelineModelServable#transform` in addition to `PipelineModel#loadServable`. Which part of the `PipelineModelServable` lifecycle is not tested?
   
   Also, should we remove `PipelineModelServableTest` since all its content is covered by the test here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] lindong28 commented on pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "lindong28 (via GitHub)" <gi...@apache.org>.
lindong28 commented on PR #218:
URL: https://github.com/apache/flink-ml/pull/218#issuecomment-1466025161

   Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink-ml] jiangxin369 commented on a diff in pull request #218: [FLINK-31306] Add Servable for PipelineModel

Posted by "jiangxin369 (via GitHub)" <gi...@apache.org>.
jiangxin369 commented on code in PR #218:
URL: https://github.com/apache/flink-ml/pull/218#discussion_r1130715217


##########
flink-ml-servable-core/src/test/java/org/apache/flink/ml/servable/TestUtils.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.servable;
+
+import org.apache.flink.ml.servable.api.DataFrame;
+import org.apache.flink.ml.servable.api.Row;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/** Utility methods for tests. */
+public class TestUtils {
+
+    /** Compares two dataframes. */
+    public static void compareDataFrame(DataFrame first, DataFrame second) {

Review Comment:
   I named it `assertDataFrameEquals` refer to `Assert#assertArrayEquals` and `StageTest#assertParamMapEquals`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org