You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/09/05 00:43:20 UTC
[flink-ml] branch master updated: [FLINK-29175] Add documents for KBinsDiscretizer, RegexTokenizer, Tokenizer, VectorIndexer and AgglomerativeClustering
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/master by this push:
new 2f1620e [FLINK-29175] Add documents for KBinsDiscretizer, RegexTokenizer, Tokenizer, VectorIndexer and AgglomerativeClustering
2f1620e is described below
commit 2f1620e24ee648346319355549d1858724ec0163
Author: Zhipeng Zhang <zh...@gmail.com>
AuthorDate: Mon Sep 5 08:43:15 2022 +0800
[FLINK-29175] Add documents for KBinsDiscretizer, RegexTokenizer, Tokenizer, VectorIndexer and AgglomerativeClustering
This closes #152.
---
.../clustering/agglomerativeclustering.md | 24 +--
.../docs/operators/feature/kbinsdiscretizer.md | 185 ++++++++++++++++++
.../docs/operators/feature/regextokenizer.md | 156 +++++++++++++++
docs/content/docs/operators/feature/tokenizer.md | 148 +++++++++++++++
.../docs/operators/feature/vectorindexer.md | 210 +++++++++++++++++++++
.../examples/ml/feature/hashingtf_example.py | 2 +-
.../ml/feature/indextostringmodel_example.py | 2 +-
.../ml/feature/kbinsdiscreteizer_example.py | 2 +-
.../examples/ml/feature/regextokenizer_example.py | 2 +-
.../examples/ml/feature/tokenizer_example.py | 4 +-
.../examples/ml/feature/vectorindexer_example.py | 2 +-
11 files changed, 718 insertions(+), 19 deletions(-)
diff --git a/docs/content/docs/operators/clustering/agglomerativeclustering.md b/docs/content/docs/operators/clustering/agglomerativeclustering.md
index 1803c87..c2988b5 100644
--- a/docs/content/docs/operators/clustering/agglomerativeclustering.md
+++ b/docs/content/docs/operators/clustering/agglomerativeclustering.md
@@ -49,15 +49,15 @@ format of the merging information is
### Parameters
-| Key | Default | Type | Required | Description |
-|:------------------|:---------------|:--------|:---------|:-----------------------------------------------------------|
-| numClusters | `2` | Integer | no | The max number of clusters to create. |
-| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. |
-| linkage | `"ward"` | String | no | Criterion for computing distance between two clusters. |
-| computeFullTree | `false` | Boolean | no | Whether computes the full tree after convergence. |
-| distanceMeasure | `"euclidean"` | String | no | Distance measure. |
-| featuresCol | `"features"` | String | no | Features column name. |
-| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| Key | Default | Type | Required | Description |
+|:------------------|:---------------|:--------|:---------|:--------------------------------------------------------------------------------------------------------------------|
+| numClusters | `2` | Integer | no | The max number of clusters to create. |
+| distanceThreshold | `null` | Double | no | Threshold to decide whether two clusters should be merged. |
+| linkage | `"ward"` | String | no | Criterion for computing distance between two clusters. Supported values: `'ward', 'complete', 'single', 'average'`. |
+| computeFullTree | `false` | Boolean | no | Whether computes the full tree after convergence. |
+| distanceMeasure | `"euclidean"` | String | no | Distance measure. Supported values: `'euclidean', 'manhattan', 'cosine'`. |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
### Examples
@@ -152,9 +152,9 @@ input_data = t_env.from_data_stream(
[DenseVectorTypeInfo()])))
# Creates an AgglomerativeClustering object and initializes its parameters.
-agglomerative_clustering = AgglomerativeClustering()
- .set_linkage('ward')
- .set_distance_measure('euclidean')
+agglomerative_clustering = AgglomerativeClustering() \
+ .set_linkage('ward') \
+ .set_distance_measure('euclidean') \
.set_prediction_col('prediction')
# Uses the AgglomerativeClustering for clustering.
diff --git a/docs/content/docs/operators/feature/kbinsdiscretizer.md b/docs/content/docs/operators/feature/kbinsdiscretizer.md
new file mode 100644
index 0000000..f449752
--- /dev/null
+++ b/docs/content/docs/operators/feature/kbinsdiscretizer.md
@@ -0,0 +1,185 @@
+---
+title: "KBinsDiscretizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/kbinsdiscretizer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## KBinsDiscretizer
+
+KBinsDiscretizer is an algorithm that implements discretization (also known as
+quantization or binning) to transform continuous features into discrete ones.
+The output values are in [0, numBins).
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:------------|:----------|:---------------------------|
+| inputCol | DenseVector | `"input"` | Vectors to be discretized. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:------------|:-----------|:---------------------|
+| outputCol | DenseVector | `"output"` | Discretized vectors. |
+
+### Parameters
+
+Below are the parameters required by `KBinsDiscretizerModel`.
+
+| Key | Default | Type | Required | Description |
+|:----------|:-----------|:-------|:---------|:--------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+
+`KBinsDiscretizer` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+|:-----------|:-------------|:--------|:---------|:-------------------------------------------------------------------------------------------------|
+| strategy | `"quantile"` | String | no | Strategy used to define the width of the bin. Supported values: 'uniform', 'quantile', 'kmeans'. |
+| numBins | `5` | Integer | no | Number of bins to produce. |
+| subSamples | `200000` | Integer | no | Maximum number of samples used to fit the model. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizer;
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerModel;
+import org.apache.flink.ml.feature.kbinsdiscretizer.KBinsDiscretizerParams;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+/** Simple program that trains a KBinsDiscretizer model and uses it for feature engineering. */
+public class KBinsDiscretizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(
+ Row.of(Vectors.dense(1, 10, 0)),
+ Row.of(Vectors.dense(1, 10, 0)),
+ Row.of(Vectors.dense(1, 10, 0)),
+ Row.of(Vectors.dense(4, 10, 0)),
+ Row.of(Vectors.dense(5, 10, 0)),
+ Row.of(Vectors.dense(6, 10, 0)),
+ Row.of(Vectors.dense(7, 10, 0)),
+ Row.of(Vectors.dense(10, 10, 0)),
+ Row.of(Vectors.dense(13, 10, 3)));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+ // Creates a KBinsDiscretizer object and initializes its parameters.
+ KBinsDiscretizer kBinsDiscretizer =
+ new KBinsDiscretizer().setNumBins(3).setStrategy(KBinsDiscretizerParams.UNIFORM);
+
+ // Trains the KBinsDiscretizer Model.
+ KBinsDiscretizerModel model = kBinsDiscretizer.fit(inputTable);
+
+ // Uses the KBinsDiscretizer Model for predictions.
+ Table outputTable = model.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ DenseVector inputValue = (DenseVector) row.getField(kBinsDiscretizer.getInputCol());
+ DenseVector outputValue = (DenseVector) row.getField(kBinsDiscretizer.getOutputCol());
+ System.out.printf("Input Value: %s\tOutput Value: %s\n", inputValue, outputValue);
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a KBinsDiscretizer model and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.kbinsdiscretizer import KBinsDiscretizer
+from pyflink.table import StreamTableEnvironment
+
+# Creates a new StreamExecutionEnvironment.
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# Creates a StreamTableEnvironment.
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input for training and prediction.
+input_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(1, 10, 0),),
+ (Vectors.dense(1, 10, 0),),
+ (Vectors.dense(1, 10, 0),),
+ (Vectors.dense(4, 10, 0),),
+ (Vectors.dense(5, 10, 0),),
+ (Vectors.dense(6, 10, 0),),
+ (Vectors.dense(7, 10, 0),),
+ (Vectors.dense(10, 10, 0),),
+ (Vectors.dense(13, 10, 0),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input', ],
+ [DenseVectorTypeInfo(), ])))
+
+# Creates a KBinsDiscretizer object and initializes its parameters.
+k_bins_discretizer = KBinsDiscretizer() \
+ .set_input_col('input') \
+ .set_output_col('output') \
+ .set_num_bins(3) \
+ .set_strategy('uniform')
+
+# Trains the KBinsDiscretizer Model.
+model = k_bins_discretizer.fit(input_table)
+
+# Uses the KBinsDiscretizer Model for predictions.
+output = model.transform(input_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ print('Input Value: ' + str(result[field_names.index(k_bins_discretizer.get_input_col())])
+ + '\tOutput Value: ' +
+ str(result[field_names.index(k_bins_discretizer.get_output_col())]))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/regextokenizer.md b/docs/content/docs/operators/feature/regextokenizer.md
new file mode 100644
index 0000000..942d01f
--- /dev/null
+++ b/docs/content/docs/operators/feature/regextokenizer.md
@@ -0,0 +1,156 @@
+---
+title: "RegexTokenizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/regextokenizer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## RegexTokenizer
+
+RegexTokenizer is an algorithm that converts the input string
+to lowercase and then splits it by white spaces based on regex.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:----------|:-------------------------|
+| inputCol | String | `"input"` | Strings to be tokenized. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:---------|:-----------|:-------------------|
+| outputCol | String[] | `"output"` | Tokenized Strings. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|:---------------|:-----------|:--------|:---------|:------------------------------------------------------------------|
+| minTokenLength | `1` | Integer | no | Minimum token length. |
+| gaps | `true` | Boolean | no | Set regex to match gaps or tokens. |
+| pattern | `"\s+"` | String | no | Regex pattern used for tokenizing. |
+| toLowercase | `true` | Boolean | no | Whether to convert all characters to lowercase before tokenizing. |
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.feature.regextokenizer.RegexTokenizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a RegexTokenizer instance and uses it for feature engineering. */
+public class RegexTokenizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+ // Creates a RegexTokenizer object and initializes its parameters.
+ RegexTokenizer regexTokenizer =
+ new RegexTokenizer()
+ .setInputCol("input")
+ .setOutputCol("output")
+ .setPattern("\\w+|\\p{Punct}");
+
+ // Uses the Tokenizer object for feature transformations.
+ Table outputTable = regexTokenizer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ String inputValue = (String) row.getField(regexTokenizer.getInputCol());
+ String[] outputValues = (String[]) row.getField(regexTokenizer.getOutputCol());
+
+ System.out.printf(
+ "Input Value: %s \tOutput Values: %s\n",
+ inputValue, Arrays.toString(outputValues));
+ }
+ }
+}
+
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a RegexTokenizer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.regextokenizer import RegexTokenizer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ ('Test for tokenization.',),
+ ('Te,st. punct',),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [Types.STRING()])))
+
+# Creates a RegexTokenizer object and initializes its parameters.
+regex_tokenizer = RegexTokenizer() \
+ .set_input_col("input") \
+ .set_output_col("output")
+
+# Uses the Tokenizer object for feature transformations.
+output = regex_tokenizer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(regex_tokenizer.get_input_col())]
+ output_value = result[field_names.index(regex_tokenizer.get_output_col())]
+ print('Input Values: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/tokenizer.md b/docs/content/docs/operators/feature/tokenizer.md
new file mode 100644
index 0000000..8120260
--- /dev/null
+++ b/docs/content/docs/operators/feature/tokenizer.md
@@ -0,0 +1,148 @@
+---
+title: "Tokenizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/tokenizer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Tokenizer
+
+Tokenizer is an algorithm that converts the input string
+to lowercase and then splits it by white spaces.
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:----------|:-------------------------|
+| inputCol | String | `"input"` | Strings to be tokenized. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:---------|:-----------|:-------------------|
+| outputCol | String[] | `"output"` | Tokenized strings. |
+
+### Parameters
+
+| Key | Default | Type | Required | Description |
+|:----------|:-----------|:-------|:---------|:--------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+
+import org.apache.flink.ml.feature.tokenizer.Tokenizer;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+
+/** Simple program that creates a Tokenizer instance and uses it for feature engineering. */
+public class TokenizerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ DataStream<Row> inputStream =
+ env.fromElements(Row.of("Test for tokenization."), Row.of("Te,st. punct"));
+ Table inputTable = tEnv.fromDataStream(inputStream).as("input");
+
+ // Creates a Tokenizer object and initializes its parameters.
+ Tokenizer tokenizer = new Tokenizer().setInputCol("input").setOutputCol("output");
+
+ // Uses the Tokenizer object for feature transformations.
+ Table outputTable = tokenizer.transform(inputTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+
+ String inputValue = (String) row.getField(tokenizer.getInputCol());
+ String[] outputValues = (String[]) row.getField(tokenizer.getOutputCol());
+
+ System.out.printf(
+ "Input Value: %s \tOutput Values: %s\n",
+ inputValue, Arrays.toString(outputValues));
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that creates a Tokenizer instance and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.tokenizer import Tokenizer
+from pyflink.table import StreamTableEnvironment
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input data.
+input_data_table = t_env.from_data_stream(
+ env.from_collection([
+ ('Test for tokenization.',),
+ ('Te,st. punct',),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input'],
+ [Types.STRING()])))
+
+# Creates a Tokenizer object and initializes its parameters.
+tokenizer = Tokenizer() \
+ .set_input_col("input") \
+ .set_output_col("output")
+
+# Uses the Tokenizer object for feature transformations.
+output = tokenizer.transform(input_data_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ input_value = result[field_names.index(tokenizer.get_input_col())]
+ output_value = result[field_names.index(tokenizer.get_output_col())]
+ print('Input Value: ' + str(input_value) + '\tOutput Values: ' + str(output_value))
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/docs/content/docs/operators/feature/vectorindexer.md b/docs/content/docs/operators/feature/vectorindexer.md
new file mode 100644
index 0000000..0738e7f
--- /dev/null
+++ b/docs/content/docs/operators/feature/vectorindexer.md
@@ -0,0 +1,210 @@
+---
+title: "VectorIndexer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/vectorindexer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## VectorIndexer
+
+VectorIndexer is an algorithm that implements the vector
+indexing algorithm. A vector indexer maps each column of
+the input vector into a continuous/categorical feature.
+Whether one feature is transformed into a continuous or
+categorical feature depends on the number of distinct
+values in this column. If the number of distinct values
+in one column is greater than a specified parameter
+(i.e., maxCategories), the corresponding output column
+is unchanged. Otherwise, it is transformed into a
+categorical value. For categorical outputs, the indices
+are in [0, numDistinctValuesInThisColumn].
+
+The output model is organized in ascending order except
+that 0.0 is always mapped to 0 (for sparsity).
+
+### Input Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:----------|:-----------------------|
+| inputCol | Vector | `"input"` | Vectors to be indexed. |
+
+### Output Columns
+
+| Param name | Type | Default | Description |
+|:-----------|:-------|:-----------|:-----------------|
+| outputCol | Vector | `"output"` | Indexed vectors. |
+
+### Parameters
+
+Below are the parameters required by `VectorIndexerModel`.
+
+| Key | Default | Type | Required | Description |
+|:--------------|:-----------|:-------|:---------|:---------------------------------------------------------------------------------|
+| inputCol | `"input"` | String | no | Input column name. |
+| outputCol | `"output"` | String | no | Output column name. |
+| handleInvalid | `"error"` | String | no | Strategy to handle invalid entries. Supported values: `'error', 'skip', 'keep'`. |
+
+`VectorIndexer` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+|:--------------|:--------|:--------|:---------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| maxCategories | `20` | Integer | no | Threshold for the number of values a categorical feature can take (>= 2). If a feature is found to have > maxCategories values, then it is declared continuous. |
+
+### Examples
+
+{{< tabs examples >}}
+
+{{< tab "Java">}}
+
+```java
+import org.apache.flink.ml.common.param.HasHandleInvalid;
+import org.apache.flink.ml.feature.vectorindexer.VectorIndexer;
+import org.apache.flink.ml.feature.vectorindexer.VectorIndexerModel;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** Simple program that creates a VectorIndexer instance and uses it for feature engineering. */
+public class VectorIndexerExample {
+ public static void main(String[] args) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ // Generates input data.
+ List<Row> trainInput =
+ Arrays.asList(
+ Row.of(Vectors.dense(1, 1)),
+ Row.of(Vectors.dense(2, -1)),
+ Row.of(Vectors.dense(3, 1)),
+ Row.of(Vectors.dense(4, 0)),
+ Row.of(Vectors.dense(5, 0)));
+
+ List<Row> predictInput =
+ Arrays.asList(
+ Row.of(Vectors.dense(0, 2)),
+ Row.of(Vectors.dense(0, 0)),
+ Row.of(Vectors.dense(0, -1)));
+
+ Table trainTable = tEnv.fromDataStream(env.fromCollection(trainInput)).as("input");
+ Table predictTable = tEnv.fromDataStream(env.fromCollection(predictInput)).as("input");
+
+ // Creates a VectorIndexer object and initializes its parameters.
+ VectorIndexer vectorIndexer =
+ new VectorIndexer()
+ .setInputCol("input")
+ .setOutputCol("output")
+ .setHandleInvalid(HasHandleInvalid.KEEP_INVALID)
+ .setMaxCategories(3);
+
+ // Trains the VectorIndexer Model.
+ VectorIndexerModel model = vectorIndexer.fit(trainTable);
+
+ // Uses the VectorIndexer Model for predictions.
+ Table outputTable = model.transform(predictTable)[0];
+
+ // Extracts and displays the results.
+ for (CloseableIterator<Row> it = outputTable.execute().collect(); it.hasNext(); ) {
+ Row row = it.next();
+ System.out.printf(
+ "Input Value: %s \tOutput Value: %s\n",
+ row.getField(vectorIndexer.getInputCol()),
+ row.getField(vectorIndexer.getOutputCol()));
+ }
+ }
+}
+
+```
+
+{{< /tab>}}
+
+{{< tab "Python">}}
+
+```python
+# Simple program that trains a VectorIndexer model and uses it for feature
+# engineering.
+
+from pyflink.common import Types
+from pyflink.ml.core.linalg import Vectors, DenseVectorTypeInfo
+from pyflink.datastream import StreamExecutionEnvironment
+from pyflink.ml.lib.feature.vectorindexer import VectorIndexer
+from pyflink.table import StreamTableEnvironment
+
+# Creates a new StreamExecutionEnvironment.
+env = StreamExecutionEnvironment.get_execution_environment()
+
+# Creates a StreamTableEnvironment.
+t_env = StreamTableEnvironment.create(env)
+
+# Generates input training and prediction data.
+train_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(1, 1),),
+ (Vectors.dense(2, -1),),
+ (Vectors.dense(3, 1),),
+ (Vectors.dense(4, 0),),
+ (Vectors.dense(5, 0),)
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input', ],
+ [DenseVectorTypeInfo(), ])))
+
+predict_table = t_env.from_data_stream(
+ env.from_collection([
+ (Vectors.dense(0, 2),),
+ (Vectors.dense(0, 0),),
+ (Vectors.dense(0, -1),),
+ ],
+ type_info=Types.ROW_NAMED(
+ ['input', ],
+ [DenseVectorTypeInfo(), ])))
+
+# Creates a VectorIndexer object and initializes its parameters.
+vector_indexer = VectorIndexer() \
+ .set_input_col('input') \
+ .set_output_col('output') \
+ .set_handle_invalid('keep') \
+ .set_max_categories(3)
+
+# Trains the VectorIndexer Model.
+model = vector_indexer.fit(train_table)
+
+# Uses the VectorIndexer Model for predictions.
+output = model.transform(predict_table)[0]
+
+# Extracts and displays the results.
+field_names = output.get_schema().get_field_names()
+for result in t_env.to_data_stream(output).execute_and_collect():
+ print('Input Value: ' + str(result[field_names.index(vector_indexer.get_input_col())])
+ + '\tOutput Value: ' + str(result[field_names.index(vector_indexer.get_output_col())]))
+
+```
+
+{{< /tab>}}
+
+{{< /tabs>}}
diff --git a/flink-ml-python/pyflink/examples/ml/feature/hashingtf_example.py b/flink-ml-python/pyflink/examples/ml/feature/hashingtf_example.py
index 50dce84..6698f78 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/hashingtf_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/hashingtf_example.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-# Simple program that creates a VectorAssembler instance and uses it for feature
+# Simple program that creates a HashingTF instance and uses it for feature
# engineering.
from pyflink.common import Types
diff --git a/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
index 5d9b41a..f38a733 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/indextostringmodel_example.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-# Simple program that creates an IndexToStringModelExample instance and uses it
+# Simple program that creates an IndexToStringModel instance and uses it
# for feature engineering.
from pyflink.common import Types
diff --git a/flink-ml-python/pyflink/examples/ml/feature/kbinsdiscreteizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/kbinsdiscreteizer_example.py
index 412e6d9..d6841c3 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/kbinsdiscreteizer_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/kbinsdiscreteizer_example.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-# Simple program that trains a StringIndexer model and uses it for feature
+# Simple program that trains a KBinsDiscretizer model and uses it for feature
# engineering.
from pyflink.common import Types
diff --git a/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py
index 73c1da7..0bbde75 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/regextokenizer_example.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-# Simple program that creates a VectorAssembler instance and uses it for feature
+# Simple program that creates a RegexTokenizer instance and uses it for feature
# engineering.
from pyflink.common import Types
diff --git a/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py b/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
index 05e56da..aa8f1ed 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/tokenizer_example.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-# Simple program that creates a VectorAssembler instance and uses it for feature
+# Simple program that creates a Tokenizer instance and uses it for feature
# engineering.
from pyflink.common import Types
@@ -51,4 +51,4 @@ field_names = output.get_schema().get_field_names()
for result in t_env.to_data_stream(output).execute_and_collect():
input_value = result[field_names.index(tokenizer.get_input_col())]
output_value = result[field_names.index(tokenizer.get_output_col())]
- print('Input Values: ' + str(input_value) + '\tOutput Value: ' + str(output_value))
+ print('Input Value: ' + str(input_value) + '\tOutput Values: ' + str(output_value))
diff --git a/flink-ml-python/pyflink/examples/ml/feature/vectorindexer_example.py b/flink-ml-python/pyflink/examples/ml/feature/vectorindexer_example.py
index f2bf9c1..25e9f48 100644
--- a/flink-ml-python/pyflink/examples/ml/feature/vectorindexer_example.py
+++ b/flink-ml-python/pyflink/examples/ml/feature/vectorindexer_example.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-# Simple program that trains a StringIndexer model and uses it for feature
+# Simple program that trains a VectorIndexer model and uses it for feature
# engineering.
from pyflink.common import Types