You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/09/30 09:49:22 UTC

[flink-ml] branch master updated: [FLINK-29318] Add Transformer for PolynomialExpansion

This is an automated email from the ASF dual-hosted git repository.

zhangzp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a6ecde  [FLINK-29318] Add Transformer for PolynomialExpansion
4a6ecde is described below

commit 4a6ecdee6cdc978e16333ac22b0f5b9083514049
Author: weibo <wb...@pku.edu.cn>
AuthorDate: Fri Sep 30 17:49:16 2022 +0800

    [FLINK-29318] Add Transformer for PolynomialExpansion
    
    This closes #155.
---
 .../docs/operators/feature/polynomialexpansion.md  | 160 ++++++++++++
 .../feature/PolynomialExpansionExample.java        |  66 +++++
 .../polynomialexpansion/PolynomialExpansion.java   | 289 +++++++++++++++++++++
 .../PolynomialExpansionParams.java                 |  44 ++++
 .../flink/ml/feature/PolynomialExpansionTest.java  | 199 ++++++++++++++
 .../ml/feature/polynomialexpansion_example.py      |  58 +++++
 .../pyflink/ml/lib/feature/polynomialexpansion.py  |  75 ++++++
 .../lib/feature/tests/test_polynomialexpansion.py  |  74 ++++++
 8 files changed, 965 insertions(+)

diff --git a/docs/content/docs/operators/feature/polynomialexpansion.md b/docs/content/docs/operators/feature/polynomialexpansion.md
new file mode 100644
index 0000000..06adaf8
--- /dev/null
+++ b/docs/content/docs/operators/feature/polynomialexpansion.md
@@ -0,0 +1,160 @@
+---
+title: "PolynomialExpansion"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/polynomialexpansion.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## PolynomialExpansion
+
+A Transformer that expands the input vectors in polynomial space.
+
+Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it with degree 2, then
+we get `(x, x * x, y, x * y, y * y)`.
+
+<p>For more information about the polynomial expansion, see 
+http://en.wikipedia.org/wiki/Polynomial_expansion.
+
+### Input Columns
+
+| Param name | Type   | Default   | Description             |
+|:-----------|:-------|:----------|:------------------------|
+| inputCol   | Vector | `"input"` | Vectors to be expanded. |
+
+### Output Columns
+
+| Param name | Type   | Default    | Description       |
+|:-----------|:-------|:-----------|:------------------|
+| outputCol  | Vector | `"output"` | Expanded vectors. |
+
+### Parameters
+
+| Key       | Default    | Type    | Required | Description                         |
+|:----------|:-----------|:--------|:---------|:------------------------------------|
+| inputCol  | `"input"`  | String  | no       | Input column name.                  |
+| outputCol | `"output"` | String  | no       | Output column name.                 |
+| degree    | `2`        | Integer | no       | Degree of the polynomial expansion. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.polynomialexpansion.PolynomialExpansion;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that creates a PolynomialExpansion instance and uses it for feature engineering. */
+public class PolynomialExpansionExample {
+	public static void main(String[] args) {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+		// Generates input data.
+		DataStream<Row> inputStream =
+			env.fromElements(
+				Row.of(Vectors.dense(2.1, 3.1, 1.2)),
+				Row.of(Vectors.dense(1.2, 3.1, 4.6)));
+		Table inputTable = tEnv.fromDataStream(inputStream).as("inputVec");
+
+		// Creates a PolynomialExpansion object and initializes its parameters.
+		PolynomialExpansion polynomialExpansion =
+			new PolynomialExpansion().setInputCol("inputVec").setDegree(2).setOutputCol("outputVec");
+
+		// Uses the PolynomialExpansion object for feature transformations.
+		Table outputTable = polynomialExpansion.transform(inputTable)[0];
+
+		// Extracts and displays the results.
+		for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+			Row row = it.next();
+
+			Vector inputValue = (Vector) row.getField(polynomialExpansion.getInputCol());
+
+			Vector outputValue = (Vector) row.getField(polynomialExpansion.getOutputCol());
+
+			System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
+		}
+	}
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a PolynomialExpansion instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.polynomialexpansion import PolynomialExpansion
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+        (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'input_vec'],
+            [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a polynomial expansion object and initialize its parameters
+polynomialExpansion = PolynomialExpansion() \
+    .set_input_col('input_vec') \
+    .set_degree(2) \
+    .set_output_col('output_vec')
+
+# use the polynomial expansion model for feature engineering
+output = polynomialExpansion.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(polynomialExpansion.get_input_col())]
+    output_value = result[field_names.index(polynomialExpansion.get_output_col())]
+    print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/PolynomialExpansionExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/PolynomialExpansionExample.java
new file mode 100644
index 0000000..e4e6e20
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/PolynomialExpansionExample.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.polynomialexpansion.PolynomialExpansion;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/**
+ * Simple program that creates a PolynomialExpansion instance and uses it for feature engineering.
+ */
+public class PolynomialExpansionExample {
+    public static void main(String[] args) {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+        // Generates input data.
+        DataStream<Row> inputStream =
+                env.fromElements(
+                        Row.of(Vectors.dense(2.1, 3.1, 1.2)), Row.of(Vectors.dense(1.2, 3.1, 4.6)));
+        Table inputTable = tEnv.fromDataStream(inputStream).as("inputVec");
+
+        // Creates a PolynomialExpansion object and initializes its parameters.
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("inputVec")
+                        .setDegree(2)
+                        .setOutputCol("outputVec");
+
+        // Uses the PolynomialExpansion object for feature transformations.
+        Table outputTable = polynomialExpansion.transform(inputTable)[0];
+
+        // Extracts and displays the results.
+        for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+            Row row = it.next();
+
+            Vector inputValue = (Vector) row.getField(polynomialExpansion.getInputCol());
+
+            Vector outputValue = (Vector) row.getField(polynomialExpansion.getOutputCol());
+
+            System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
+        }
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java
new file mode 100644
index 0000000..86fd2ea
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that expands the input vectors in polynomial space.
+ *
+ * <p>Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it with degree 2, then
+ * we get `(x, x * x, y, x * y, y * y)`.
+ *
+ * <p>For more information about the polynomial expansion, see
+ * http://en.wikipedia.org/wiki/Polynomial_expansion.
+ */
+public class PolynomialExpansion
+        implements Transformer<PolynomialExpansion>,
+                PolynomialExpansionParams<PolynomialExpansion> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public PolynomialExpansion() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+
+        DataStream<Row> output =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                new PolynomialExpansionFunction(getDegree(), getInputCol()),
+                                outputTypeInfo);
+
+        Table outputTable = tEnv.fromDataStream(output);
+        return new Table[] {outputTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+    }
+
+    public static PolynomialExpansion load(StreamTableEnvironment env, String path)
+            throws IOException {
+        return ReadWriteUtils.loadStageParam(path);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    /**
+     * Polynomial expansion function that expands a vector in polynomial space. This expansion is
+     * done using recursion. Given input vector and degree, the size after expansion is (vectorSize
+     * + degree) (including 1 and first-order values). For example, let f([a, b, c], 3) be the
+     * function that expands [a, b, c] to their monomials of degree 3. We have the following
+     * recursion:
+     *
+     * <blockquote>
+     *
+     * $$ f([a, b, c], 3) &= f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3] $$
+     *
+     * </blockquote>
+     *
+     * <p>To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember
+     * the current index and increment it properly for sparse input.
+     */
+    private static class PolynomialExpansionFunction implements MapFunction<Row, Row> {
+        private final int degree;
+        private final String inputCol;
+
+        public PolynomialExpansionFunction(int degree, String inputCol) {
+            this.degree = degree;
+            this.inputCol = inputCol;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            Vector vec = row.getFieldAs(inputCol);
+            if (vec == null) {
+                throw new IllegalArgumentException("The vector must not be null.");
+            }
+            Vector outputVec;
+            if (vec instanceof DenseVector) {
+                int size = vec.size();
+                double[] retVals = new double[getResultVectorSize(size, degree) - 1];
+                expandDenseVector(((DenseVector) vec).values, size - 1, degree, 1.0, retVals, -1);
+                outputVec = new DenseVector(retVals);
+            } else if (vec instanceof SparseVector) {
+                SparseVector sparseVec = (SparseVector) vec;
+                int[] indices = sparseVec.indices;
+                double[] values = sparseVec.values;
+                int size = sparseVec.size();
+                int nnz = sparseVec.values.length;
+                int nnzPolySize = getResultVectorSize(nnz, degree);
+
+                Tuple2<Integer, int[]> polyIndices = Tuple2.of(0, new int[nnzPolySize - 1]);
+                Tuple2<Integer, double[]> polyValues = Tuple2.of(0, new double[nnzPolySize - 1]);
+                expandSparseVector(
+                        indices,
+                        values,
+                        nnz - 1,
+                        size - 1,
+                        degree,
+                        1.0,
+                        polyIndices,
+                        polyValues,
+                        -1);
+
+                outputVec =
+                        new SparseVector(
+                                getResultVectorSize(size, degree) - 1,
+                                polyIndices.f1,
+                                polyValues.f1);
+            } else {
+                throw new UnsupportedOperationException(
+                        "Only supports DenseVector or SparseVector.");
+            }
+            return Row.join(row, Row.of(outputVec));
+        }
+
+        /** Calculates the length of the expended vector. */
+        private static int getResultVectorSize(int num, int degree) {
+            if (num == 0) {
+                return 1;
+            }
+
+            if (num == 1 || degree == 1) {
+                return num + degree;
+            }
+
+            if (degree > num) {
+                return getResultVectorSize(degree, num);
+            }
+
+            long res = 1;
+            int i = num + 1;
+            int j;
+
+            if (num + degree < 61) {
+                for (j = 1; j <= degree; ++j) {
+                    res = res * i / j;
+                    ++i;
+                }
+            } else {
+                int depth;
+                for (j = 1; j <= degree; ++j) {
+                    depth = ArithmeticUtils.gcd(i, j);
+                    res = ArithmeticUtils.mulAndCheck(res / (j / depth), i / depth);
+                    ++i;
+                }
+            }
+
+            if (res > Integer.MAX_VALUE) {
+                throw new RuntimeException("The expended polynomial size is too large.");
+            }
+            return (int) res;
+        }
+
+        /** Expands the dense vector in polynomial space. */
+        private static int expandDenseVector(
+                double[] values,
+                int lastIdx,
+                int degree,
+                double factor,
+                double[] retValues,
+                int curPolyIdx) {
+            if (!Double.valueOf(factor).equals(0.0)) {
+                if (degree == 0 || lastIdx < 0) {
+                    if (curPolyIdx >= 0) {
+                        retValues[curPolyIdx] = factor;
+                    }
+                } else {
+                    double v = values[lastIdx];
+                    int newLastIdx = lastIdx - 1;
+                    double alpha = factor;
+                    int i = 0;
+                    int curStart = curPolyIdx;
+
+                    while (i <= degree && Math.abs(alpha) > 0.0) {
+                        curStart =
+                                expandDenseVector(
+                                        values, newLastIdx, degree - i, alpha, retValues, curStart);
+                        i += 1;
+                        alpha *= v;
+                    }
+                }
+            }
+            return curPolyIdx + getResultVectorSize(lastIdx + 1, degree);
+        }
+
+        /** Expands the sparse vector in polynomial space. */
+        private static int expandSparseVector(
+                int[] indices,
+                double[] values,
+                int lastIdx,
+                int lastFeatureIdx,
+                int degree,
+                double factor,
+                Tuple2<Integer, int[]> polyIndices,
+                Tuple2<Integer, double[]> polyValues,
+                int curPolyIdx) {
+            if (!Double.valueOf(factor).equals(0.0)) {
+                if (degree == 0 || lastIdx < 0) {
+                    if (curPolyIdx >= 0) {
+                        polyIndices.f1[polyIndices.f0] = curPolyIdx;
+                        polyValues.f1[polyValues.f0] = factor;
+                        polyIndices.f0++;
+                        polyValues.f0++;
+                    }
+                } else {
+                    double v = values[lastIdx];
+                    int lastIdx1 = lastIdx - 1;
+                    int lastFeatureIdx1 = indices[lastIdx] - 1;
+                    double alpha = factor;
+                    int curStart = curPolyIdx;
+                    int i = 0;
+
+                    while (i <= degree && Math.abs(alpha) > 0.0) {
+                        curStart =
+                                expandSparseVector(
+                                        indices,
+                                        values,
+                                        lastIdx1,
+                                        lastFeatureIdx1,
+                                        degree - i,
+                                        alpha,
+                                        polyIndices,
+                                        polyValues,
+                                        curStart);
+                        i++;
+                        alpha *= v;
+                    }
+                }
+            }
+            return curPolyIdx + getResultVectorSize(lastFeatureIdx + 1, degree);
+        }
+    }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansionParams.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansionParams.java
new file mode 100644
index 0000000..abbc983
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansionParams.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link PolynomialExpansion}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface PolynomialExpansionParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Integer> DEGREE =
+            new IntParam(
+                    "degree", "Degree of the polynomial expansion.", 2, ParamValidators.gtEq(1));
+
+    default int getDegree() {
+        return get(DEGREE);
+    }
+
+    default T setDegree(Integer value) {
+        return set(DEGREE, value);
+    }
+}
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
new file mode 100644
index 0000000..a61e7d8
--- /dev/null
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.polynomialexpansion.PolynomialExpansion;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link PolynomialExpansion}. */
+public class PolynomialExpansionTest extends AbstractTestBase {
+
+    private StreamTableEnvironment tEnv;
+    private Table inputDataTable;
+
+    private static final List<Row> INPUT_DATA =
+            Arrays.asList(
+                    Row.of(
+                            Vectors.dense(1.0, 2.0, 3.0),
+                            Vectors.sparse(5, new int[] {1, 4}, new double[] {2.0, 3.0})),
+                    Row.of(
+                            Vectors.dense(2.0, 3.0),
+                            Vectors.sparse(5, new int[] {1, 4}, new double[] {2.0, 1.0})));
+
+    private static final List<Vector> EXPECTED_DENSE_OUTPUT =
+            Arrays.asList(
+                    Vectors.dense(1.0, 1.0, 2.0, 2.0, 4.0, 3.0, 3.0, 6.0, 9.0),
+                    Vectors.dense(2.0, 4.0, 3.0, 6.0, 9.0));
+
+    private static final List<Vector> EXPECTED_DENSE_OUTPUT_WITH_DEGREE_3 =
+            Arrays.asList(
+                    Vectors.dense(
+                            1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 4.0, 4.0, 8.0, 3.0, 3.0, 3.0, 6.0, 6.0,
+                            12.0, 9.0, 9.0, 18.0, 27.0),
+                    Vectors.dense(2.0, 4.0, 8.0, 3.0, 6.0, 12.0, 9.0, 18.0, 27.0));
+
+    private static final List<Vector> EXPECTED_SPARSE_OUTPUT =
+            Arrays.asList(
+                    Vectors.sparse(
+                            55,
+                            new int[] {3, 6, 8, 34, 37, 39, 49, 51, 54},
+                            new double[] {2.0, 4.0, 8.0, 3.0, 6.0, 12.0, 9.0, 18.0, 27.0}),
+                    Vectors.sparse(
+                            55,
+                            new int[] {3, 6, 8, 34, 37, 39, 49, 51, 54},
+                            new double[] {2.0, 4.0, 8.0, 1.0, 2.0, 4.0, 1.0, 2.0, 1.0}));
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+
+        tEnv = StreamTableEnvironment.create(env);
+        DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+        inputDataTable = tEnv.fromDataStream(dataStream).as("denseVec", "sparseVec");
+    }
+
+    private void verifyOutputResult(Table output, String outputCol, List<Vector> expectedData)
+            throws Exception {
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) output).getTableEnvironment();
+        DataStream<Row> stream = tEnv.toDataStream(output);
+
+        List<Row> results = IteratorUtils.toList(stream.executeAndCollect());
+        List<Vector> resultVec = new ArrayList<>(results.size());
+        for (Row row : results) {
+            if (row.getField(outputCol) != null) {
+                resultVec.add(row.getFieldAs(outputCol));
+            }
+        }
+        compareResultCollections(expectedData, resultVec, TestUtils::compare);
+    }
+
+    @Test
+    public void testParam() {
+        PolynomialExpansion polynomialExpansion = new PolynomialExpansion();
+        assertEquals("input", polynomialExpansion.getInputCol());
+        assertEquals("output", polynomialExpansion.getOutputCol());
+        assertEquals(2, polynomialExpansion.getDegree());
+
+        polynomialExpansion.setInputCol("denseVec").setOutputCol("outputVec").setDegree(5);
+        assertEquals("denseVec", polynomialExpansion.getInputCol());
+        assertEquals("outputVec", polynomialExpansion.getOutputCol());
+        assertEquals(5, polynomialExpansion.getDegree());
+    }
+
+    @Test
+    public void testOutputSchema() {
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("denseVec")
+                        .setOutputCol("outputVec")
+                        .setDegree(3);
+
+        Table output = polynomialExpansion.transform(inputDataTable)[0];
+
+        assertEquals(
+                Arrays.asList("denseVec", "sparseVec", "outputVec"),
+                output.getResolvedSchema().getColumnNames());
+    }
+
+    @Test
+    public void testSaveLoadAndTransform() throws Exception {
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("denseVec")
+                        .setOutputCol("outputVec")
+                        .setDegree(2);
+
+        PolynomialExpansion loadedPolynomialExpansion =
+                TestUtils.saveAndReload(
+                        tEnv, polynomialExpansion, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+        Table output = loadedPolynomialExpansion.transform(inputDataTable)[0];
+        verifyOutputResult(output, loadedPolynomialExpansion.getOutputCol(), EXPECTED_DENSE_OUTPUT);
+    }
+
+    @Test
+    public void testInvalidDegree() {
+        try {
+            PolynomialExpansion polynomialExpansion =
+                    new PolynomialExpansion()
+                            .setInputCol("denseVec")
+                            .setOutputCol("outputVec")
+                            .setDegree(-1);
+            polynomialExpansion.transform(inputDataTable);
+            fail();
+        } catch (Exception e) {
+            assertEquals("Parameter degree is given an invalid value -1", e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDenseTransform() throws Exception {
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("denseVec")
+                        .setOutputCol("outputVec")
+                        .setDegree(3);
+
+        Table output = polynomialExpansion.transform(inputDataTable)[0];
+        verifyOutputResult(
+                output, polynomialExpansion.getOutputCol(), EXPECTED_DENSE_OUTPUT_WITH_DEGREE_3);
+    }
+
+    @Test
+    public void testSparseTransform() throws Exception {
+        PolynomialExpansion polynomialExpansion =
+                new PolynomialExpansion()
+                        .setInputCol("sparseVec")
+                        .setOutputCol("outputVec")
+                        .setDegree(3);
+
+        Table output = polynomialExpansion.transform(inputDataTable)[0];
+        verifyOutputResult(output, polynomialExpansion.getOutputCol(), EXPECTED_SPARSE_OUTPUT);
+    }
+}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/polynomialexpansion_example.py b/flink-ml-python/pyflink/examples/ml/feature/polynomialexpansion_example.py
new file mode 100644
index 0000000..26aa27f
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/polynomialexpansion_example.py
@@ -0,0 +1,58 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a PolynomialExpansion instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.polynomialexpansion import PolynomialExpansion
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+    env.from_collection([
+        (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+        (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+    ],
+        type_info=Types.ROW_NAMED(
+            ['id', 'input_vec'],
+            [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a polynomial expansion object and initialize its parameters
+polynomialExpansion = PolynomialExpansion() \
+    .set_input_col('input_vec') \
+    .set_degree(2) \
+    .set_output_col('output_vec')
+
+# use the polynomial expansion model for feature engineering
+output = polynomialExpansion.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+    input_value = result[field_names.index(polynomialExpansion.get_input_col())]
+    output_value = result[field_names.index(polynomialExpansion.get_output_col())]
+    print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/ml/lib/feature/polynomialexpansion.py b/flink-ml-python/pyflink/ml/lib/feature/polynomialexpansion.py
new file mode 100644
index 0000000..e140dd9
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/polynomialexpansion.py
@@ -0,0 +1,75 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.core.param import IntParam, ParamValidators
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, Param
+
+
+class _PolynomialExpansionParams(
+    JavaWithParams,
+    HasInputCol,
+    HasOutputCol
+):
+    """
+    Params for :class:`PolynomialExpansion`.
+    """
+
+    DEGREE: Param[int] = IntParam(
+        "degree",
+        "Degree of the polynomial expansion.",
+        2,
+        ParamValidators.gt_eq(1))
+
+    def __init__(self, java_params):
+        super(_PolynomialExpansionParams, self).__init__(java_params)
+
+    def set_degree(self, value: int):
+        return typing.cast(_PolynomialExpansionParams, self.set(self.DEGREE, value))
+
+    def get_degree(self) -> bool:
+        return self.get(self.DEGREE)
+
+    @property
+    def degree(self):
+        return self.get_degree()
+
+
+class PolynomialExpansion(JavaFeatureTransformer, _PolynomialExpansionParams):
+    """
+    A Transformer that expands the input vectors in polynomial space.
+
+    Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it with degree 2, then
+    we get `(x, x * x, y, x * y, y * y)`.
+
+    For more information about the polynomial expansion, see
+    http://en.wikipedia.org/wiki/Polynomial_expansion.
+    """
+
+    def __init__(self, java_model=None):
+        super(PolynomialExpansion, self).__init__(java_model)
+
+    @classmethod
+    def _java_transformer_package_name(cls) -> str:
+        return "polynomialexpansion"
+
+    @classmethod
+    def _java_transformer_class_name(cls) -> str:
+        return "PolynomialExpansion"
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_polynomialexpansion.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_polynomialexpansion.py
new file mode 100644
index 0000000..ef54992
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_polynomialexpansion.py
@@ -0,0 +1,74 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.polynomialexpansion import PolynomialExpansion
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class PolynomialExpansionTest(PyFlinkMLTestCase):
+    def setUp(self):
+        super(PolynomialExpansionTest, self).setUp()
+        self.input_data_table = self.t_env.from_data_stream(
+            self.env.from_collection([
+                (Vectors.dense(1.0, 2.0),),
+                (Vectors.dense(2.0, 3.0),),
+            ],
+                type_info=Types.ROW_NAMED(
+                    ["intput_vec"],
+                    [DenseVectorTypeInfo()])))
+        self.expected_output_data = [
+            Vectors.dense(1.0, 1.0, 2.0, 2.0, 4.0),
+            Vectors.dense(2.0, 4.0, 3.0, 6.0, 9.0)]
+
+    def test_param(self):
+        polynomialexpansion = PolynomialExpansion()
+
+        self.assertEqual('input', polynomialexpansion.get_input_col())
+        self.assertEqual('output', polynomialexpansion.get_output_col())
+        self.assertEqual(2, polynomialexpansion.get_degree())
+
+        polynomialexpansion.set_input_col("intput_vec") \
+            .set_output_col('output_vec') \
+            .set_degree(3)
+
+        self.assertEqual("intput_vec", polynomialexpansion.get_input_col())
+        self.assertEqual(3, polynomialexpansion.get_degree())
+        self.assertEqual('output_vec', polynomialexpansion.get_output_col())
+
+    def test_save_load_transform(self):
+        polynomialexpansion = PolynomialExpansion() \
+            .set_input_col("intput_vec") \
+            .set_output_col('output_vec') \
+            .set_degree(2)
+
+        path = os.path.join(self.temp_dir, 'test_save_load_transform_polynomialexpansion')
+        polynomialexpansion.save(path)
+        polynomialexpansion = PolynomialExpansion.load(self.t_env, path)
+
+        output_table = polynomialexpansion.transform(self.input_data_table)[0]
+        actual_outputs = [(result[1]) for result in
+                          self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+        self.assertEqual(2, len(actual_outputs))
+        actual_outputs.sort(key=lambda x: (x[0], x[1], x[2], x[3], x[4]))
+        self.expected_output_data.sort(key=lambda x: (x[0], x[1], x[2], x[3], x[4]))
+        self.assertEqual(self.expected_output_data, actual_outputs)