You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2022/09/30 09:49:22 UTC
[flink-ml] branch master updated: [FLINK-29318] Add Transformer for PolynomialExpansion
This is an automated email from the ASF dual-hosted git repository.
zhangzp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 4a6ecde [FLINK-29318] Add Transformer for PolynomialExpansion
4a6ecde is described below
commit 4a6ecdee6cdc978e16333ac22b0f5b9083514049
Author: weibo <wb...@pku.edu.cn>
AuthorDate: Fri Sep 30 17:49:16 2022 +0800
[FLINK-29318] Add Transformer for PolynomialExpansion
This closes #155.
---
.../docs/operators/feature/polynomialexpansion.md | 160 ++++++++++++
.../feature/PolynomialExpansionExample.java | 66 +++++
.../polynomialexpansion/PolynomialExpansion.java | 289 +++++++++++++++++++++
.../PolynomialExpansionParams.java | 44 ++++
.../flink/ml/feature/PolynomialExpansionTest.java | 199 ++++++++++++++
.../ml/feature/polynomialexpansion_example.py | 58 +++++
.../pyflink/ml/lib/feature/polynomialexpansion.py | 75 ++++++
.../lib/feature/tests/test_polynomialexpansion.py | 74 ++++++
8 files changed, 965 insertions(+)
diff --git a/docs/content/docs/operators/feature/polynomialexpansion.md b/docs/content/docs/operators/feature/polynomialexpansion.md
new file mode 100644
index 0000000..06adaf8
--- /dev/null
+++ b/docs/content/docs/operators/feature/polynomialexpansion.md
@@ -0,0 +1,160 @@
+---
+title: "PolynomialExpansion"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/polynomialexpansion.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## PolynomialExpansion
+
+A Transformer that expands the input vectors in polynomial space.
+
+Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it with degree 2, then
+we get `(x, x * x, y, x * y, y * y)`.
+
+<p>For more information about the polynomial expansion, see
+http://en.wikipedia.org/wiki/Polynomial_expansion.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:----------|:------------------------|
+| inputCol | Vector | `"input"` | Vectors to be expanded. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:-----------|:------------------|
+| outputCol | Vector | `"output"` | Expanded vectors. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|:----------|:-----------|:--------|:---------|:------------------------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+| degree | `2` | Integer | no | Degree of the polynomial expansion. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.polynomialexpansion.PolynomialExpansion;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that creates a PolynomialExpansion instance and uses it for feature engineering. */
+public class PolynomialExpansionExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(
+ Row.of(Vectors.dense(2.1, 3.1, 1.2)),
+ Row.of(Vectors.dense(1.2, 3.1, 4.6)));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("inputVec");
+
+ // Creates a PolynomialExpansion object and initializes its parameters.
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion().setInputCol("inputVec").setDegree(2).setOutputCol("outputVec");
+
+ // Uses the PolynomialExpansion object for feature transformations.
+ Table outputTable = polynomialExpansion.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ Vector inputValue = (Vector) row.getField(polynomialExpansion.getInputCol());
+
+ Vector outputValue = (Vector) row.getField(polynomialExpansion.getOutputCol());
+
+ System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a PolynomialExpansion instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.polynomialexpansion import PolynomialExpansion
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+ (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'input_vec'],
+ [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a polynomial expansion object and initialize its parameters
+polynomialExpansion = PolynomialExpansion() \
+ .set_input_col('input_vec') \
+ .set_degree(2) \
+ .set_output_col('output_vec')
+
+# use the polynomial expansion model for feature engineering
+output = polynomialExpansion.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(polynomialExpansion.get_input_col())]
+ output_value = result[field_names.index(polynomialExpansion.get_output_col())]
+ print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/PolynomialExpansionExample.java b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/PolynomialExpansionExample.java
new file mode 100644
index 0000000..e4e6e20
--- /dev/null
+++ b/flink-ml-examples/src/main/java/org/apache/flink/ml/examples/feature/PolynomialExpansionExample.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.examples.feature;
+
+import org.apache.flink.ml.feature.polynomialexpansion.PolynomialExpansion;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/**
+ * Simple program that creates a PolynomialExpansion instance and uses it for feature engineering.
+ */
+public class PolynomialExpansionExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(
+ Row.of(Vectors.dense(2.1, 3.1, 1.2)), Row.of(Vectors.dense(1.2, 3.1, 4.6)));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("inputVec");
+
+ // Creates a PolynomialExpansion object and initializes its parameters.
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion()
+ .setInputCol("inputVec")
+ .setDegree(2)
+ .setOutputCol("outputVec");
+
+ // Uses the PolynomialExpansion object for feature transformations.
+ Table outputTable = polynomialExpansion.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ Vector inputValue = (Vector) row.getField(polynomialExpansion.getInputCol());
+
+ Vector outputValue = (Vector) row.getField(polynomialExpansion.getOutputCol());
+
+ System.out.printf("Input Value: %s \tOutput Value: %s\n", inputValue, outputValue);
+ }
+ }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java
new file mode 100644
index 0000000..86fd2ea
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansion.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Transformer;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.typeinfo.VectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A Transformer that expands the input vectors in polynomial space.
+ *
+ * <p>Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it with degree 2, then
+ * we get `(x, x * x, y, x * y, y * y)`.
+ *
+ * <p>For more information about the polynomial expansion, see
+ * http://en.wikipedia.org/wiki/Polynomial_expansion.
+ */
+public class PolynomialExpansion
+ implements Transformer<PolynomialExpansion>,
+ PolynomialExpansionParams<PolynomialExpansion> {
+ private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+ public PolynomialExpansion() {
+ ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+ }
+
+ @Override
+ public Table[] transform(Table... inputs) {
+ Preconditions.checkArgument(inputs.length == 1);
+ StreamTableEnvironment tEnv =
+ (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+ RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+
+ RowTypeInfo outputTypeInfo =
+ new RowTypeInfo(
+ ArrayUtils.addAll(inputTypeInfo.getFieldTypes(), VectorTypeInfo.INSTANCE),
+ ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+
+ DataStream<Row> output =
+ tEnv.toDataStream(inputs[0])
+ .map(
+ new PolynomialExpansionFunction(getDegree(), getInputCol()),
+ outputTypeInfo);
+
+ Table outputTable = tEnv.fromDataStream(output);
+ return new Table[] {outputTable};
+ }
+
+ @Override
+ public void save(String path) throws IOException {
+ ReadWriteUtils.saveMetadata(this, path);
+ }
+
+ public static PolynomialExpansion load(StreamTableEnvironment env, String path)
+ throws IOException {
+ return ReadWriteUtils.loadStageParam(path);
+ }
+
+ @Override
+ public Map<Param<?>, Object> getParamMap() {
+ return paramMap;
+ }
+
+ /**
+ * Polynomial expansion function that expands a vector in polynomial space. This expansion is
+ * done using recursion. Given input vector and degree, the size after expansion is (vectorSize
+ * + degree) (including 1 and first-order values). For example, let f([a, b, c], 3) be the
+ * function that expands [a, b, c] to their monomials of degree 3. We have the following
+ * recursion:
+ *
+ * <blockquote>
+ *
+ * $$ f([a, b, c], 3) &= f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3] $$
+ *
+ * </blockquote>
+ *
+ * <p>To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember
+ * the current index and increment it properly for sparse input.
+ */
+ private static class PolynomialExpansionFunction implements MapFunction<Row, Row> {
+ private final int degree;
+ private final String inputCol;
+
+ public PolynomialExpansionFunction(int degree, String inputCol) {
+ this.degree = degree;
+ this.inputCol = inputCol;
+ }
+
+ @Override
+ public Row map(Row row) throws Exception {
+ Vector vec = row.getFieldAs(inputCol);
+ if (vec == null) {
+ throw new IllegalArgumentException("The vector must not be null.");
+ }
+ Vector outputVec;
+ if (vec instanceof DenseVector) {
+ int size = vec.size();
+ double[] retVals = new double[getResultVectorSize(size, degree) - 1];
+ expandDenseVector(((DenseVector) vec).values, size - 1, degree, 1.0, retVals, -1);
+ outputVec = new DenseVector(retVals);
+ } else if (vec instanceof SparseVector) {
+ SparseVector sparseVec = (SparseVector) vec;
+ int[] indices = sparseVec.indices;
+ double[] values = sparseVec.values;
+ int size = sparseVec.size();
+ int nnz = sparseVec.values.length;
+ int nnzPolySize = getResultVectorSize(nnz, degree);
+
+ Tuple2<Integer, int[]> polyIndices = Tuple2.of(0, new int[nnzPolySize - 1]);
+ Tuple2<Integer, double[]> polyValues = Tuple2.of(0, new double[nnzPolySize - 1]);
+ expandSparseVector(
+ indices,
+ values,
+ nnz - 1,
+ size - 1,
+ degree,
+ 1.0,
+ polyIndices,
+ polyValues,
+ -1);
+
+ outputVec =
+ new SparseVector(
+ getResultVectorSize(size, degree) - 1,
+ polyIndices.f1,
+ polyValues.f1);
+ } else {
+ throw new UnsupportedOperationException(
+ "Only supports DenseVector or SparseVector.");
+ }
+ return Row.join(row, Row.of(outputVec));
+ }
+
+ /** Calculates the length of the expended vector. */
+ private static int getResultVectorSize(int num, int degree) {
+ if (num == 0) {
+ return 1;
+ }
+
+ if (num == 1 || degree == 1) {
+ return num + degree;
+ }
+
+ if (degree > num) {
+ return getResultVectorSize(degree, num);
+ }
+
+ long res = 1;
+ int i = num + 1;
+ int j;
+
+ if (num + degree < 61) {
+ for (j = 1; j <= degree; ++j) {
+ res = res * i / j;
+ ++i;
+ }
+ } else {
+ int depth;
+ for (j = 1; j <= degree; ++j) {
+ depth = ArithmeticUtils.gcd(i, j);
+ res = ArithmeticUtils.mulAndCheck(res / (j / depth), i / depth);
+ ++i;
+ }
+ }
+
+ if (res > Integer.MAX_VALUE) {
+ throw new RuntimeException("The expended polynomial size is too large.");
+ }
+ return (int) res;
+ }
+
+ /** Expands the dense vector in polynomial space. */
+ private static int expandDenseVector(
+ double[] values,
+ int lastIdx,
+ int degree,
+ double factor,
+ double[] retValues,
+ int curPolyIdx) {
+ if (!Double.valueOf(factor).equals(0.0)) {
+ if (degree == 0 || lastIdx < 0) {
+ if (curPolyIdx >= 0) {
+ retValues[curPolyIdx] = factor;
+ }
+ } else {
+ double v = values[lastIdx];
+ int newLastIdx = lastIdx - 1;
+ double alpha = factor;
+ int i = 0;
+ int curStart = curPolyIdx;
+
+ while (i <= degree && Math.abs(alpha) > 0.0) {
+ curStart =
+ expandDenseVector(
+ values, newLastIdx, degree - i, alpha, retValues, curStart);
+ i += 1;
+ alpha *= v;
+ }
+ }
+ }
+ return curPolyIdx + getResultVectorSize(lastIdx + 1, degree);
+ }
+
+ /** Expands the sparse vector in polynomial space. */
+ private static int expandSparseVector(
+ int[] indices,
+ double[] values,
+ int lastIdx,
+ int lastFeatureIdx,
+ int degree,
+ double factor,
+ Tuple2<Integer, int[]> polyIndices,
+ Tuple2<Integer, double[]> polyValues,
+ int curPolyIdx) {
+ if (!Double.valueOf(factor).equals(0.0)) {
+ if (degree == 0 || lastIdx < 0) {
+ if (curPolyIdx >= 0) {
+ polyIndices.f1[polyIndices.f0] = curPolyIdx;
+ polyValues.f1[polyValues.f0] = factor;
+ polyIndices.f0++;
+ polyValues.f0++;
+ }
+ } else {
+ double v = values[lastIdx];
+ int lastIdx1 = lastIdx - 1;
+ int lastFeatureIdx1 = indices[lastIdx] - 1;
+ double alpha = factor;
+ int curStart = curPolyIdx;
+ int i = 0;
+
+ while (i <= degree && Math.abs(alpha) > 0.0) {
+ curStart =
+ expandSparseVector(
+ indices,
+ values,
+ lastIdx1,
+ lastFeatureIdx1,
+ degree - i,
+ alpha,
+ polyIndices,
+ polyValues,
+ curStart);
+ i++;
+ alpha *= v;
+ }
+ }
+ }
+ return curPolyIdx + getResultVectorSize(lastFeatureIdx + 1, degree);
+ }
+ }
+}
diff --git a/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansionParams.java b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansionParams.java
new file mode 100644
index 0000000..abbc983
--- /dev/null
+++ b/flink-ml-lib/src/main/java/org/apache/flink/ml/feature/polynomialexpansion/PolynomialExpansionParams.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.polynomialexpansion;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link PolynomialExpansion}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface PolynomialExpansionParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+ Param<Integer> DEGREE =
+ new IntParam(
+ "degree", "Degree of the polynomial expansion.", 2, ParamValidators.gtEq(1));
+
+ default int getDegree() {
+ return get(DEGREE);
+ }
+
+ default T setDegree(Integer value) {
+ return set(DEGREE, value);
+ }
+}
diff --git a/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
new file mode 100644
index 0000000..a61e7d8
--- /dev/null
+++ b/flink-ml-lib/src/test/java/org/apache/flink/ml/feature/PolynomialExpansionTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.polynomialexpansion.PolynomialExpansion;
+import org.apache.flink.ml.linalg.Vector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/** Tests {@link PolynomialExpansion}. */
+public class PolynomialExpansionTest extends AbstractTestBase {
+
+ private StreamTableEnvironment tEnv;
+ private Table inputDataTable;
+
+ private static final List<Row> INPUT_DATA =
+ Arrays.asList(
+ Row.of(
+ Vectors.dense(1.0, 2.0, 3.0),
+ Vectors.sparse(5, new int[] {1, 4}, new double[] {2.0, 3.0})),
+ Row.of(
+ Vectors.dense(2.0, 3.0),
+ Vectors.sparse(5, new int[] {1, 4}, new double[] {2.0, 1.0})));
+
+ private static final List<Vector> EXPECTED_DENSE_OUTPUT =
+ Arrays.asList(
+ Vectors.dense(1.0, 1.0, 2.0, 2.0, 4.0, 3.0, 3.0, 6.0, 9.0),
+ Vectors.dense(2.0, 4.0, 3.0, 6.0, 9.0));
+
+ private static final List<Vector> EXPECTED_DENSE_OUTPUT_WITH_DEGREE_3 =
+ Arrays.asList(
+ Vectors.dense(
+ 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 4.0, 4.0, 8.0, 3.0, 3.0, 3.0, 6.0, 6.0,
+ 12.0, 9.0, 9.0, 18.0, 27.0),
+ Vectors.dense(2.0, 4.0, 8.0, 3.0, 6.0, 12.0, 9.0, 18.0, 27.0));
+
+ private static final List<Vector> EXPECTED_SPARSE_OUTPUT =
+ Arrays.asList(
+ Vectors.sparse(
+ 55,
+ new int[] {3, 6, 8, 34, 37, 39, 49, 51, 54},
+ new double[] {2.0, 4.0, 8.0, 3.0, 6.0, 12.0, 9.0, 18.0, 27.0}),
+ Vectors.sparse(
+ 55,
+ new int[] {3, 6, 8, 34, 37, 39, 49, 51, 54},
+ new double[] {2.0, 4.0, 8.0, 1.0, 2.0, 4.0, 1.0, 2.0, 1.0}));
+
+ @Before
+ public void before() {
+ Configuration config = new Configuration();
+ config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+
+ env.setParallelism(4);
+ env.enableCheckpointing(100);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+
+ tEnv = StreamTableEnvironment.create(env);
+ DataStream<Row> dataStream = env.fromCollection(INPUT_DATA);
+ inputDataTable = tEnv.fromDataStream(dataStream).as("denseVec", "sparseVec");
+ }
+
+ private void verifyOutputResult(Table output, String outputCol, List<Vector> expectedData)
+ throws Exception {
+ StreamTableEnvironment tEnv =
+ (StreamTableEnvironment) ((TableImpl) output).getTableEnvironment();
+ DataStream<Row> stream = tEnv.toDataStream(output);
+
+ List<Row> results = IteratorUtils.toList(stream.executeAndCollect());
+ List<Vector> resultVec = new ArrayList<>(results.size());
+ for (Row row : results) {
+ if (row.getField(outputCol) != null) {
+ resultVec.add(row.getFieldAs(outputCol));
+ }
+ }
+ compareResultCollections(expectedData, resultVec, TestUtils::compare);
+ }
+
+ @Test
+ public void testParam() {
+ PolynomialExpansion polynomialExpansion = new PolynomialExpansion();
+ assertEquals("input", polynomialExpansion.getInputCol());
+ assertEquals("output", polynomialExpansion.getOutputCol());
+ assertEquals(2, polynomialExpansion.getDegree());
+
+ polynomialExpansion.setInputCol("denseVec").setOutputCol("outputVec").setDegree(5);
+ assertEquals("denseVec", polynomialExpansion.getInputCol());
+ assertEquals("outputVec", polynomialExpansion.getOutputCol());
+ assertEquals(5, polynomialExpansion.getDegree());
+ }
+
+ @Test
+ public void testOutputSchema() {
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion()
+ .setInputCol("denseVec")
+ .setOutputCol("outputVec")
+ .setDegree(3);
+
+ Table output = polynomialExpansion.transform(inputDataTable)[0];
+
+ assertEquals(
+ Arrays.asList("denseVec", "sparseVec", "outputVec"),
+ output.getResolvedSchema().getColumnNames());
+ }
+
+ @Test
+ public void testSaveLoadAndTransform() throws Exception {
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion()
+ .setInputCol("denseVec")
+ .setOutputCol("outputVec")
+ .setDegree(2);
+
+ PolynomialExpansion loadedPolynomialExpansion =
+ TestUtils.saveAndReload(
+ tEnv, polynomialExpansion, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
+
+ Table output = loadedPolynomialExpansion.transform(inputDataTable)[0];
+ verifyOutputResult(output, loadedPolynomialExpansion.getOutputCol(), EXPECTED_DENSE_OUTPUT);
+ }
+
+ @Test
+ public void testInvalidDegree() {
+ try {
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion()
+ .setInputCol("denseVec")
+ .setOutputCol("outputVec")
+ .setDegree(-1);
+ polynomialExpansion.transform(inputDataTable);
+ fail();
+ } catch (Exception e) {
+ assertEquals("Parameter degree is given an invalid value -1", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testDenseTransform() throws Exception {
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion()
+ .setInputCol("denseVec")
+ .setOutputCol("outputVec")
+ .setDegree(3);
+
+ Table output = polynomialExpansion.transform(inputDataTable)[0];
+ verifyOutputResult(
+ output, polynomialExpansion.getOutputCol(), EXPECTED_DENSE_OUTPUT_WITH_DEGREE_3);
+ }
+
+ @Test
+ public void testSparseTransform() throws Exception {
+ PolynomialExpansion polynomialExpansion =
+ new PolynomialExpansion()
+ .setInputCol("sparseVec")
+ .setOutputCol("outputVec")
+ .setDegree(3);
+
+ Table output = polynomialExpansion.transform(inputDataTable)[0];
+ verifyOutputResult(output, polynomialExpansion.getOutputCol(), EXPECTED_SPARSE_OUTPUT);
+ }
+}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/polynomialexpansion_example.py b/flink-ml-python/pyflink/examples/ml/feature/polynomialexpansion_example.py
new file mode 100644
index 0000000..26aa27f
--- /dev/null
+++ b/flink-ml-python/pyflink/examples/ml/feature/polynomialexpansion_example.py
@@ -0,0 +1,58 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Simple program that creates a PolynomialExpansion instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.polynomialexpansion import PolynomialExpansion
+from pyflink.table import StreamTableEnvironment
+
+# create a new StreamExecutionEnvironment
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# create a StreamTableEnvironment
+t_env = StreamTableEnvironment.create(env)
+
+# generate input data
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ (1, Vectors.dense(2.1, 3.1, 1.2, 2.1)),
+ (2, Vectors.dense(2.3, 2.1, 1.3, 1.2)),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['id', 'input_vec'],
+ [Types.INT(), DenseVectorTypeInfo()])))
+
+# create a polynomial expansion object and initialize its parameters
+polynomialExpansion = PolynomialExpansion() \
+ .set_input_col('input_vec') \
+ .set_degree(2) \
+ .set_output_col('output_vec')
+
+# use the polynomial expansion model for feature engineering
+output = polynomialExpansion.transform(input_data_table)[0]
+
+# extract and display the results
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(polynomialExpansion.get_input_col())]
+ output_value = result[field_names.index(polynomialExpansion.get_output_col())]
+ print('Input Value: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/ml/lib/feature/polynomialexpansion.py b/flink-ml-python/pyflink/ml/lib/feature/polynomialexpansion.py
new file mode 100644
index 0000000..e140dd9
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/polynomialexpansion.py
@@ -0,0 +1,75 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import typing
+
+from pyflink.ml.core.wrapper import JavaWithParams
+from pyflink.ml.core.param import IntParam, ParamValidators
+from pyflink.ml.lib.feature.common import JavaFeatureTransformer
+from pyflink.ml.lib.param import HasInputCol, HasOutputCol, Param
+
+
+class _PolynomialExpansionParams(
+ JavaWithParams,
+ HasInputCol,
+ HasOutputCol
+):
+ """
+ Params for :class:`PolynomialExpansion`.
+ """
+
+ DEGREE: Param[int] = IntParam(
+ "degree",
+ "Degree of the polynomial expansion.",
+ 2,
+ ParamValidators.gt_eq(1))
+
+ def __init__(self, java_params):
+ super(_PolynomialExpansionParams, self).__init__(java_params)
+
+ def set_degree(self, value: int):
+ return typing.cast(_PolynomialExpansionParams, self.set(self.DEGREE, value))
+
+ def get_degree(self) -> bool:
+ return self.get(self.DEGREE)
+
+ @property
+ def degree(self):
+ return self.get_degree()
+
+
+class PolynomialExpansion(JavaFeatureTransformer, _PolynomialExpansionParams):
+ """
+ A Transformer that expands the input vectors in polynomial space.
+
+ Take a 2-dimension vector as an example: `(x, y)`, if we want to expand it with degree 2, then
+ we get `(x, x * x, y, x * y, y * y)`.
+
+ For more information about the polynomial expansion, see
+ http://en.wikipedia.org/wiki/Polynomial_expansion.
+ """
+
+ def __init__(self, java_model=None):
+ super(PolynomialExpansion, self).__init__(java_model)
+
+ @classmethod
+ def _java_transformer_package_name(cls) -> str:
+ return "polynomialexpansion"
+
+ @classmethod
+ def _java_transformer_class_name(cls) -> str:
+ return "PolynomialExpansion"
diff --git a/flink-ml-python/pyflink/ml/lib/feature/tests/test_polynomialexpansion.py b/flink-ml-python/pyflink/ml/lib/feature/tests/test_polynomialexpansion.py
new file mode 100644
index 0000000..ef54992
--- /dev/null
+++ b/flink-ml-python/pyflink/ml/lib/feature/tests/test_polynomialexpansion.py
@@ -0,0 +1,74 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+
+from pyflink.common import Types
+
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.ml.lib.feature.polynomialexpansion import PolynomialExpansion
+from pyflink.ml.tests.test_utils import PyFlinkMLTestCase
+
+
+class PolynomialExpansionTest(PyFlinkMLTestCase):
+ def setUp(self):
+ super(PolynomialExpansionTest, self).setUp()
+ self.input_data_table = self.t_env.from_data_stream(
+ self.env.from_collection([
+ (Vectors.dense(1.0, 2.0),),
+ (Vectors.dense(2.0, 3.0),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ["intput_vec"],
+ [DenseVectorTypeInfo()])))
+ self.expected_output_data = [
+ Vectors.dense(1.0, 1.0, 2.0, 2.0, 4.0),
+ Vectors.dense(2.0, 4.0, 3.0, 6.0, 9.0)]
+
+ def test_param(self):
+ polynomialexpansion = PolynomialExpansion()
+
+ self.assertEqual('input', polynomialexpansion.get_input_col())
+ self.assertEqual('output', polynomialexpansion.get_output_col())
+ self.assertEqual(2, polynomialexpansion.get_degree())
+
+ polynomialexpansion.set_input_col("intput_vec") \
+ .set_output_col('output_vec') \
+ .set_degree(3)
+
+ self.assertEqual("intput_vec", polynomialexpansion.get_input_col())
+ self.assertEqual(3, polynomialexpansion.get_degree())
+ self.assertEqual('output_vec', polynomialexpansion.get_output_col())
+
+ def test_save_load_transform(self):
+ polynomialexpansion = PolynomialExpansion() \
+ .set_input_col("intput_vec") \
+ .set_output_col('output_vec') \
+ .set_degree(2)
+
+ path = os.path.join(self.temp_dir, 'test_save_load_transform_polynomialexpansion')
+ polynomialexpansion.save(path)
+ polynomialexpansion = PolynomialExpansion.load(self.t_env, path)
+
+ output_table = polynomialexpansion.transform(self.input_data_table)[0]
+ actual_outputs = [(result[1]) for result in
+ self.t_env.to_data_stream(output_table).execute_and_collect()]
+
+ self.assertEqual(2, len(actual_outputs))
+ actual_outputs.sort(key=lambda x: (x[0], x[1], x[2], x[3], x[4]))
+ self.expected_output_data.sort(key=lambda x: (x[0], x[1], x[2], x[3], x[4]))
+ self.assertEqual(self.expected_output_data, actual_outputs)