You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/02/18 09:54:41 UTC
[flink-ml] branch release-2.0 updated: [FLINK-26100][docs] Add doc for ops & key concepts (#61)
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink-ml.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new c93c119 [FLINK-26100][docs] Add doc for ops & key concepts (#61)
c93c119 is described below
commit c93c11960f7f97438feb4dd414e73c8468cf1726
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Fri Feb 18 17:54:34 2022 +0800
[FLINK-26100][docs] Add doc for ops & key concepts (#61)
This closes #61.
---
.../docs/{operators => development}/_index.md | 5 +-
docs/content/docs/development/iteration.md | 225 +++++++++++++++++++
docs/content/docs/development/overview.md | 246 +++++++++++++++++++++
docs/content/docs/development/types.md | 49 ++++
docs/content/docs/operators/_index.md | 2 +-
.../{clustering => classification}/_index.md | 6 +-
docs/content/docs/operators/classification/knn.md | 116 ++++++++++
.../operators/classification/logisticregression.md | 108 +++++++++
.../docs/operators/classification/naivebayes.md | 100 +++++++++
docs/content/docs/operators/clustering/_index.md | 2 +-
docs/content/docs/operators/clustering/kmeans.md | 17 +-
.../operators/{clustering => feature}/_index.md | 4 +-
.../docs/operators/feature/onehotencoder.md | 83 +++++++
.../org/apache/flink/ml/builder/GraphModel.java | 4 +-
14 files changed, 947 insertions(+), 20 deletions(-)
diff --git a/docs/content/docs/operators/_index.md b/docs/content/docs/development/_index.md
similarity index 88%
copy from docs/content/docs/operators/_index.md
copy to docs/content/docs/development/_index.md
index c45aa20..3229296 100644
--- a/docs/content/docs/operators/_index.md
+++ b/docs/content/docs/development/_index.md
@@ -1,7 +1,8 @@
---
-title: Operators
-icon: <i class="fa fa-book title maindish" aria-hidden="true"></i>
+title: Development
+icon: <i class="fa fa-code title maindish" aria-hidden="true"></i>
bold: true
+sectionBreak: true
bookCollapseSection: true
weight: 2
---
diff --git a/docs/content/docs/development/iteration.md b/docs/content/docs/development/iteration.md
new file mode 100644
index 0000000..6e656be
--- /dev/null
+++ b/docs/content/docs/development/iteration.md
@@ -0,0 +1,225 @@
+---
+title: "Iteration"
+weight: 2
+type: docs
+aliases:
+- /development/iteration.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Iteration
+
+Iteration is a basic building block for a ML library. In machine learning
+algorithms, iteration might be used in offline or online training process. In
+general, two types of iterations are required and Flink ML supports both of them
+in order to provide the infrastructure for a variety of algorithms.
+
+1. **Bounded Iteration**: Usually used in the offline case. In this case the
+ algorithm usually trains on a bounded dataset, it updates the parameters for
+ multiple rounds until convergence.
+2. **Unbounded Iteration**: Usually used in the online case, in this case the
+ algorithm usually trains on an unbounded dataset. It accumulates a mini-batch
+ of data and then do one update to the parameters.
+
+## Iteration Paradigm
+
+An iterative algorithm has the following behavior pattern:
+
+- The iterative algorithm has an ***iteration body*** that is repeatedly invoked
+ until some termination criteria is reached (e.g. after a user-specified number
+ of epochs has been reached). An iteration body is a subgraph of operators that
+ implements the computation logic of e.g. an iterative machine learning
+ algorithm, whose outputs might be fed back as the inputs of this subgraph.
+- In each invocation, the iteration body updates the model parameters based on
+ the user-provided data as well as the most recent model parameters.
+- The iterative algorithm takes as inputs the user-provided data and the initial
+ model parameters.
+- The iterative algorithm could output arbitrary user-defined information, such
+ as the loss after each epoch, or the final model parameters.
+
+Therefore, the behavior of an iterative algorithm could be characterized with
+the following iteration paradigm (w.r.t. Flink concepts):
+
+- An iteration-body is a Flink subgraph with the following inputs and outputs:
+ - Inputs: **model-variables** (as a list of data streams) and
+ **user-provided-data** (as another list of data streams)
+ - Outputs: **feedback-model-variables** (as a list of data streams) and
+ **user-observed-outputs** (as a list of data streams)
+- A **termination-condition** that specifies when the iterative execution of the
+ iteration body should terminate.
+- In order to execute an iteration body, a user needs to execute the iteration
+ body with the following inputs, and gets the following outputs.
+ - Inputs: **initial-model-variables** (as a list of bounded data streams) and
+ **user-provided-data** (as a list of data streams)
+ - Outputs: the **user-observed-output** emitted by the iteration body.
+
+It is important to note that the **model-variables** expected by the iteration
+body is not the same as the **initial-model-variables** provided by the user.
+Instead, **model-variables** are computed as the union of the
+**feedback-model-variables** (emitted by the iteration body) and the
+**initial-model-variables** (provided by the caller of the iteration body).
+Flink ML provides utility class (see Iterations) to run an iteration-body with
+the user-provided inputs.
+
+The figure below summarizes the iteration paradigm described above.
+
+{{< mermaid >}}
+flowchart LR
+
+subgraph Iteration Body
+union1
+union2
+node11
+node12
+node21
+node22
+nodeX
+end
+
+input0 --> node11
+
+union1 -. feedback .- node12
+input1 --> union1
+union1 --> node11
+node11 --> nodeX
+nodeX --> node12
+node12 --> output1
+
+input2 --> union2
+union2 --> node21
+node21 --> nodeX
+nodeX --> node22
+node22 --> output2
+union2 -. feedback .- node22
+
+input0[non-iterate input]
+input1[iterate input]
+input2[iterate input]
+union1[union]
+union2[union]
+node11( )
+node12( )
+nodeX( )
+node21( )
+node22( )
+output1[output]
+output2[output]
+
+{{< /mermaid >}}
+
+## API
+
+The main entry of Flink ML's iteration lies in `Iterations` class. It mainly
+provides two public methods and users may choose to use either of them based on
+whether the input data is bounded or unbounded.
+
+```java
+public class Iterations {
+ public static DataStreamList iterateUnboundedStreams(
+ DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}
+ ...
+ public static DataStreamList iterateBoundedStreamsUntilTermination(
+ DataStreamList initVariableStreams,
+ ReplayableDataStreamList dataStreams,
+ IterationConfig config,
+ IterationBody body){...}
+}
+```
+
+To construct an iteration, Users are required to provide
+
+- `initVariableStreams`: the initial values of the variable data streams which
+ would be updated in each round.
+- `dataStreams`: the other data streams used inside the iteration, but would not
+ be updated.
+- `iterationBody`: specifies the subgraph to update the variable streams and the
+ outputs.
+
+The `IterationBody` will be invoked with two parameters: The first parameter is
+a list of input variable streams, which are created as the union of the initial
+variable streams and the corresponding feedback variable streams (returned by
+the iteration body); The second parameter is the data streams given to this
+method.
+
+```java
+public interface IterationBody extends Serializable {
+ ...
+ IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
+ ...
+}
+```
+
+During the execution of iteration body, each of the records involved in the
+iteration has an epoch attached, which marks the progress of the iteration. The
+epoch is computed as:
+
+- All records in the initial variable streams and initial data streams has epoch
+ = 0.
+- For any record emitted by this operator into a non-feedback stream, the epoch
+ of this emitted record = the epoch of the input record that triggers this
+ emission. If this record is emitted by onEpochWatermarkIncremented(), then the
+ epoch of this record = epochWatermark.
+- For any record emitted by this operator into a feedback variable stream, the
+ epoch of the emitted record = the epoch of the input record that triggers this
+ emission + 1.
+
+The framework would deliver notification at the end of each epoch to operators
+and UDFs that implements `IterationListener`.
+
+```java
+public interface IterationListener<T> {
+ void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector)
+ throws Exception;
+ ...
+ void onIterationTerminated(Context context, Collector<T> collector) throws Exception;
+}
+```
+
+## Example Usage
+
+Example codes of utilizing iterations is as below。
+
+```java
+DataStream<double[]> initParameters = ...
+DataStream<Tuple2<double[], Double>> dataset = ...
+
+DataStreamList resultStreams = Iterations.iterateUnboundedStreams(
+ DataStreamList.of(initParameters),
+ ReplayableDataStreamList.notReplay(dataset),
+ IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
+ (variableStreams, dataStreams) -> {
+ DataStream<double[]> modelUpdate = variableStreams.get(0);
+ DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
+ DataStream<double[]> newModelUpdate = ...
+ DataStream<double[]> modelOutput = ...
+ return new IterationBodyResult(
+ DataStreamList.of(newModelUpdate),
+ DataStreamList.of(modelOutput)
+});
+
+DataStream<double[]> finalModel = resultStreams.get("final_model");
+```
+
+- `initParameters`: input data that needs to be transmitted through feedback
+ edge.
+- `dataset`: input data that does not need to be tarnsmitted through feed back
+ edge.
+- `newModelUpdate`: data to be transmitted through feedback edge
+- `modelOutput`: final output of the iteration body
diff --git a/docs/content/docs/development/overview.md b/docs/content/docs/development/overview.md
new file mode 100644
index 0000000..6075e23
--- /dev/null
+++ b/docs/content/docs/development/overview.md
@@ -0,0 +1,246 @@
+---
+title: "Overview"
+weight: 1
+type: docs
+aliases:
+- /development/overview.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Overview
+
+This document provides a brief introduction to the basic concepts in Flink ML.
+
+## Table API
+
+Flink ML's API is based on Flink's Table API. The Table API is a
+language-integrated query API for Java, Scala, and Python that allows the
+composition of queries from relational operators such as selection, filter, and
+join in a very intuitive way.
+
+Table API allows the usage of a wide range of data types. [Flink Document Data
+Types](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/types/)
+page provides a list of supported types. In addition to these types, Flink ML
+also provides support for `Vector` Type.
+
+The Table API integrates seamlessly with Flink’s DataStream API. You can easily
+switch between all APIs and libraries which build upon them. Please refer to
+Flink's document for how to convert between `Table` and `DataStream`, as well as
+other usage of Flink Table API.
+
+## Stage
+
+A `Stage` is a node in a `Pipeline` or `Graph`. It is the fundamental component
+in Flink ML. This interface is only a concept, and does not have any actual
+functionality. Its subclasses include the follows.
+
+- `Estimator`: An `Estimator` is a `Stage` that is reponsible for the training
+ process in machine learning algorithms. It implements a `fit()` method that
+ takes a list of tables and produces a `Model`.
+
+- `AlgoOperator`: An `AlgoOperator` is a `Stage` that is used to encode generic
+ multi-input multi-output computation logic. It implements a `transform()`
+ method, which applies certain computation logic on the given input tables and
+ returns a list of result tables.
+
+- `Transformer`: A `Transformer` is an `AlgoOperator` with the semantic
+ difference that it encodes the Transformation logic, such that a record in the
+ output typically corresponds to one record in the input. In contrast, an
+ `AlgoOperator` is a better fit to express aggregation logic where a record in
+ the output could be computed from an arbitrary number of records in the input.
+
+- `Model`: A `Model` is a `Transformer` with the extra APIs to set and get model
+ data. It is typically generated by fitting an `Estimator` on a list of tables.
+ It provides `getModelData()` and `setModelData()`, which allows users to
+ explicitly read or write model data tables to the transformer. Each table
+ could be an unbounded stream of model data changes.
+
+A typical usage of `Stage` is to create an `Estimator` instance first, trigger
+its training process by invoking its `fit()` method, and to perform predictions
+with the resulting `Model` instance. This example usage is shown in the code
+below.
+
+```java
+// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
+
+Table trainData = ...;
+Table predictData = ...;
+
+SumEstimator estimator = new SumEstimator();
+SumModel model = estimator.fit(trainData);
+Table predictResult = model.transform(predictData)[0];
+```
+
+## Builders
+
+In order to organize Flink ML stages into more complexed format so as to achieve
+advanced functionalities, like chaining data processing and machine learning
+algorithms together, Flink ML provides APIs that help to manage the relationship
+and structure of stages in Flink jobs. The entry of these APIs includes
+`Pipeline` and `Graph`.
+
+### Pipeline
+
+A `Pipeline` acts as an `Estimator`. It consists of an ordered list of stages,
+each of which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`.
+Its `fit()` method goes through all stages of this pipeline in order and does
+the following on each stage until the last `Estimator` (inclusive).
+
+- If a stage is an `Estimator`, it would invoke the stage's `fit()` method with
+ the input tables to generage a `Model`. And if there is `Estimator` after this
+ stage, it would transform the input tables using the generated `Model` to get
+ result tables, then pass the result tables to the next stage as inputs.
+- If a stage is an `AlgoOperator` AND there is `Estimator` after this stage, it
+ would transform the input tables using this stage to get result tables, then
+ pass the result tables to the next stage as inputs.
+
+After all the `Estimators` are trained to fit their input tables, a new
+`PipelineModel` will be created with the same stages in this pipeline, except
+that all the `Estimator`s in the `PipelineModel` are replaced with the models
+generated in the above process.
+
+A `PipelineModel` acts as a `Model`. It consists of an ordered list of stages,
+each of which could be a `Model`, `Transformer` or `AlgoOperator`. Its
+`transform()` method applies all stages in this `PipelineModel` on the input
+tables in order. The output of one stage is used as the input of the next stage
+(if any). The output of the last stage is returned as the result of this method.
+
+A `Pipeline` can be created by passing a list of `Stage`s to Pipeline's
+constructor. For example,
+
+```java
+// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
+
+Model modelA = new SumModel().setModelData(tEnv.fromValues(10));
+Estimator estimatorA = new SumEstimator();
+Model modelB = new SumModel().setModelData(tEnv.fromValues(30));
+
+List<Stage<?>> stages = Arrays.asList(modelA, estimatorA, modelB);
+Estimator<?, ?> estimator = new Pipeline(stages);
+```
+
+The commands above creates a Pipeline like follows.
+
+{{< mermaid >}}
+
+graph LR
+
+empty0[ ] --> modelA --> estimatorA --> modelB --> empty1[ ]
+
+style empty0 fill:#FFFFFF, stroke:#FFFFFF;
+style empty1 fill:#FFFFFF, stroke:#FFFFFF;
+
+{{< /mermaid >}}
+
+### Graph
+
+A `Graph` acts as an `Estimator`. A `Graph` consists of a DAG of stages, each of
+which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`. When
+`Graph::fit` is called, the stages are executed in a topologically-sorted order.
+If a stage is an `Estimator`, its `Estimator::fit` method will be called on the
+input tables (from the input edges) to fit a `Model`. Then the `Model` will be
+used to transform the input tables and produce output tables to the output
+edges. If a stage is an `AlgoOperator`, its `AlgoOperator::transform` method
+will be called on the input tables and produce output tables to the output
+edges. The `GraphModel` fitted from a `Graph` consists of the fitted `Models`
+and `AlgoOperators`, corresponding to the `Graph`'s stages.
+
+A `GraphModel` acts as a `Model`. A `GraphModel` consists of a DAG of stages,
+each of which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`.
+When `GraphModel::transform` is called, the stages are executed in a
+topologically-sorted order. When a stage is executed, its
+`AlgoOperator::transform` method will be called on the input tables (from the
+input edges) and produce output tables to the output edges.
+
+A `Graph` can be constructed via the `GraphBuilder` class, which provides
+methods like `addAlgoOperator` or `addEstimator` to help adding stages to a
+graph. Flink ML also introduces `TableId` class to represent the input/output of
+a stage and to help express the relationship between stages in a graph, thus
+allowing users to construct the DAG before they have the concrete tables
+available.
+
+The example codes below shows how to build a `Graph`.
+
+```java
+// Suppose SumModel is a concrete subclass of Model.
+
+GraphBuilder builder = new GraphBuilder();
+// Creates nodes.
+SumModel stage1 = new SumModel().setModelData(tEnv.fromValues(1));
+SumModel stage2 = new SumModel();
+SumModel stage3 = new SumModel().setModelData(tEnv.fromValues(3));
+// Creates inputs and modelDataInputs.
+TableId input = builder.createTableId();
+TableId modelDataInput = builder.createTableId();
+// Feeds inputs and gets outputs.
+TableId output1 = builder.addAlgoOperator(stage1, input)[0];
+TableId output2 = builder.addAlgoOperator(stage2, output1)[0];
+builder.setModelDataOnModel(stage2, modelDataInput);
+TableId output3 = builder.addAlgoOperator(stage3, output2)[0];
+TableId modelDataOutput = builder.getModelDataFromModel(stage3)[0];
+
+// Builds a Model from the graph.
+TableId[] inputs = new TableId[] {input};
+TableId[] outputs = new TableId[] {output3};
+TableId[] modelDataInputs = new TableId[] {modelDataInput};
+TableId[] modelDataOutputs = new TableId[] {modelDataOutput};
+Model<?> model = builder.buildModel(inputs, outputs, modelDataInputs, modelDataOutputs);
+```
+
+The code above constructs a `Graph` like follows.
+
+{{< mermaid >}}
+
+graph LR
+
+empty0[ ] --> |input| stage1
+stage1 --> |output1| stage2
+empty1[ ] --> |modelDataInput| stage2
+stage2 --> |output2| stage3
+stage3 --> |output3| empty3[ ]
+stage3 --> |modelDataOutput| empty2[ ]
+
+style empty0 fill:#FFFFFF, stroke:#FFFFFF;
+style empty1 fill:#FFFFFF, stroke:#FFFFFF;
+style empty2 fill:#FFFFFF, stroke:#FFFFFF;
+style empty3 fill:#FFFFFF, stroke:#FFFFFF;
+
+{{< /mermaid >}}
+
+## Parameter
+
+Flink ML `Stage` is a subclass of `WithParams`, which provides a uniform API to
+get and set parameters.
+
+A `Param` is the definition of a parameter, including name, class, description,
+default value and the validator.
+
+In order to set the parameter of an algorithm, users can use any of the
+following ways.
+
+- Invoke the parameter's specific set method. For example, in order to set `K`,
+ the number of clusters, of a K-means algorithm, users can directly invoke
+ `setK()` method on that `KMeans` instance.
+- Pass a parameter map containing new values to the stage through
+ `ReadWriteUtils.updateExistingParams()` method.
+
+If a `Model` is generated through an `Estimator`'s `fit()` method, the `Model`
+would inherit the `Estimator` object's parameters. Thus there is no need to set
+the parameters for a second time if the parameters are not changed.
diff --git a/docs/content/docs/development/types.md b/docs/content/docs/development/types.md
new file mode 100644
index 0000000..2ffcf0e
--- /dev/null
+++ b/docs/content/docs/development/types.md
@@ -0,0 +1,49 @@
+---
+title: "Data Types"
+weight: 3
+type: docs
+aliases:
+- /development/types.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Data Types
+
+Flink ML supports all data types that have been supported by Flink Table API, as
+well as data types listed in sections below.
+
+## Vector
+
+Flink ML provides support for vectors of double values. A `Vector` in Flink ML
+can be either dense(`DenseVector`) or sparse(`SparseVector`), depending on how
+users create them accordig to the vector's sparsity. Each vector is initialized
+with a fixed size and users may get or set the double value of any 0-based index
+location in the vector.
+
+Flink ML also has a class named `Vectors` providing utility methods for
+instantiating vectors.
+
+```java
+int n = 4;
+int[] indices = new int[] {0, 2, 3};
+double[] values = new double[] {0.1, 0.3, 0.4};
+
+SparseVector vector = Vectors.sparse(n, indices, values);
+```
diff --git a/docs/content/docs/operators/_index.md b/docs/content/docs/operators/_index.md
index c45aa20..5038fcf 100644
--- a/docs/content/docs/operators/_index.md
+++ b/docs/content/docs/operators/_index.md
@@ -3,7 +3,7 @@ title: Operators
icon: <i class="fa fa-book title maindish" aria-hidden="true"></i>
bold: true
bookCollapseSection: true
-weight: 2
+weight: 3
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/classification/_index.md
similarity index 93%
copy from docs/content/docs/operators/clustering/_index.md
copy to docs/content/docs/operators/classification/_index.md
index 86f8a7e..e6f29c5 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/classification/_index.md
@@ -1,9 +1,9 @@
---
-title: Clustering
+title: Classification
bookCollapseSection: true
-weight: 1
+weight: 2
aliases:
- - /operators/clustering/
+ - /operators/feature/
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/classification/knn.md b/docs/content/docs/operators/classification/knn.md
new file mode 100644
index 0000000..4c67c56
--- /dev/null
+++ b/docs/content/docs/operators/classification/knn.md
@@ -0,0 +1,116 @@
+---
+title: "KNN"
+type: docs
+aliases:
+- /operators/classification/knn.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# KNN
+
+K Nearest Neighbor(KNN) is a classification algorithm. The basic assumption of
+KNN is that if most of the nearest K neighbors of the provided sample belongs to
+the same label, then it is highly probabl that the provided sample also belongs
+to that label.
+
+## Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
+| labelCol | Integer | `"label"` | Label to predict |
+
+## Output Columns
+
+| Param name | Type | Default | Description |
+| :------------ | :------ | :------------- | :-------------- |
+| predictionCol | Integer | `"prediction"` | Predicted label |
+
+## Parameters
+
+Below are parameters required by `KnnModel`.
+
+| Key | Default | Type | Required | Description |
+| ------------- | -------------- | ------- | -------- | -------------------------------- |
+| K | `5` | Integer | no | The number of nearest neighbors. |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+
+`Knn` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| -------- | --------- | ------ | -------- | ------------------ |
+| labelCol | `"label"` | String | no | Label column name. |
+
+## Examples
+
+```java
+import org.apache.flink.ml.classification.knn.Knn;
+import org.apache.flink.ml.classification.knn.KnnModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+
+List<Row> trainRows =
+ new ArrayList<>(
+ Arrays.asList(
+ Row.of(Vectors.dense(2.0, 3.0), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(200.1, 300.1), 2.0),
+ Row.of(Vectors.dense(200.2, 300.2), 2.0),
+ Row.of(Vectors.dense(200.3, 300.3), 2.0),
+ Row.of(Vectors.dense(200.4, 300.4), 2.0),
+ Row.of(Vectors.dense(200.4, 300.4), 2.0),
+ Row.of(Vectors.dense(200.6, 300.6), 2.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0),
+ Row.of(Vectors.dense(2.3, 3.2), 1.0),
+ Row.of(Vectors.dense(2.3, 3.2), 1.0),
+ Row.of(Vectors.dense(2.8, 3.2), 3.0),
+ Row.of(Vectors.dense(300., 3.2), 4.0),
+ Row.of(Vectors.dense(2.2, 3.2), 1.0),
+ Row.of(Vectors.dense(2.4, 3.2), 5.0),
+ Row.of(Vectors.dense(2.5, 3.2), 5.0),
+ Row.of(Vectors.dense(2.5, 3.2), 5.0),
+ Row.of(Vectors.dense(2.1, 3.1), 1.0)));
+List<Row> predictRows =
+ new ArrayList<>(
+ Arrays.asList(
+ Row.of(Vectors.dense(4.0, 4.1), 5.0),
+ Row.of(Vectors.dense(300, 42), 2.0)));
+Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.of(DenseVector.class))
+ .column("f1", DataTypes.DOUBLE())
+ .build();
+
+DataStream<Row> dataStream = env.fromCollection(trainRows);
+Table trainData = tEnv.fromDataStream(dataStream, schema).as("features", "label");
+DataStream<Row> predDataStream = env.fromCollection(predictRows);
+Table predictData = tEnv.fromDataStream(predDataStream, schema).as("features", "label");
+
+Knn knn = new Knn();
+KnnModel knnModel = knn.fit(trainData);
+Table output = knnModel.transform(predictData)[0];
+
+output.execute().print();
+```
+
diff --git a/docs/content/docs/operators/classification/logisticregression.md b/docs/content/docs/operators/classification/logisticregression.md
new file mode 100644
index 0000000..3727642
--- /dev/null
+++ b/docs/content/docs/operators/classification/logisticregression.md
@@ -0,0 +1,108 @@
+---
+title: "Logistic Regression"
+type: docs
+aliases:
+- /operators/classification/logisticregression.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Logistic Regression
+
+Logistic regression is a special case of Generalized Linear Model. It is widely
+used to predict a binary response.
+
+## Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
+| labelCol | Integer | `"label"` | Label to predict |
+| weightCol | Double | `"weight"` | Weight of sample |
+
+## Output Columns
+
+| Param name | Type | Default | Description |
+| :--------------- | :------ | :---------------- | :-------------------------------------- |
+| predictionCol | Integer | `"prediction"` | Label of the max probability |
+| rawPredictionCol | Vector | `"rawPrediction"` | Vector of the probability of each label |
+
+## Parameters
+
+Below are parameters required by `LogisticRegressionModel`.
+
+| Key | Default | Type | Required | Description |
+| ---------------- | ----------------- | ------ | -------- | --------------------------- |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+| rawPredictionCol | `"rawPrediction"` | String | no | Raw prediction column name. |
+
+`LogisticRegression` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| --------------- | --------- | ------- | -------- | ------------------------------------------------------------ |
+| labelCol | `"label"` | String | no | Label column name. |
+| weightCol | `null` | String | no | Weight column name. |
+| maxIter | `20` | Integer | no | Maximum number of iterations. |
+| reg | `0.` | Double | no | Regularization parameter. |
+| learningRate | `0.1` | Double | no | Learning rate of optimization method. |
+| globalBatchSize | `32` | Integer | no | Global batch size of training algorithms. |
+| tol | `1e-6` | Double | no | Convergence tolerance for iterative algorithms. |
+| multiClass | `"auto"` | String | no | Classification type. Supported values: "auto", "binomial", "multinomial" |
+
+## Examples
+
+```java
+import org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+
+List<Row> binomialTrainData =
+ Arrays.asList(
+ Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+ Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+ Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+ Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+ Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+ Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+ Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+ Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+ Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+ Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+Collections.shuffle(binomialTrainData);
+
+Table binomialDataTable =
+ tEnv.fromDataStream(
+ env.fromCollection(
+ binomialTrainData,
+ new RowTypeInfo(
+ new TypeInformation[] {
+ TypeInformation.of(DenseVector.class),
+ Types.DOUBLE,
+ Types.DOUBLE
+ },
+ new String[] {"features", "label", "weight"})));
+
+LogisticRegression logisticRegression = new LogisticRegression().setWeightCol("weight");
+LogisticRegressionModel model = logisticRegression.fit(binomialDataTable);
+Table output = model.transform(binomialDataTable)[0];
+
+output.execute().print();
+```
+
diff --git a/docs/content/docs/operators/classification/naivebayes.md b/docs/content/docs/operators/classification/naivebayes.md
new file mode 100644
index 0000000..73a1867
--- /dev/null
+++ b/docs/content/docs/operators/classification/naivebayes.md
@@ -0,0 +1,100 @@
+---
+title: "Naive Bayes"
+type: docs
+aliases:
+- /operators/classification/naivebayes.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Naive Bayes
+
+Naive Bayes is a multiclass classifier. Based on Bayes’ theorem, it assumes that
+there is strong (naive) independence between every pair of features.
+
+## Input Columns
+
+| Param name | Type | Default | Description |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
+| labelCol | Integer | `"label"` | Label to predict |
+
+## Output Columns
+
+| Param name | Type | Default | Description |
+| :------------ | :------ | :------------- | :-------------- |
+| predictionCol | Integer | `"prediction"` | Predicted label |
+
+## Parameters
+
+Below are parameters required by `NaiveBayesModel`.
+
+| Key | Default | Type | Required | Description |
+| ------------- | --------------- | ------ | -------- | ----------------------------------------------- |
+| modelType | `"multinomial"` | String | no | The model type. Supported values: "multinomial" |
+| featuresCol | `"features"` | String | no | Features column name. |
+| predictionCol | `"prediction"` | String | no | Prediction column name. |
+
+`NaiveBayes` needs parameters above and also below.
+
+| Key | Default | Type | Required | Description |
+| --------- | --------- | ------ | -------- | ------------------------ |
+| labelCol | `"label"` | String | no | Label column name. |
+| smoothing | `1.0` | Double | no | The smoothing parameter. |
+
+## Examples
+
+```java
+import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.linalg.Vectors;
+
+List<Row> trainData =
+ Arrays.asList(
+ Row.of(Vectors.dense(0, 0.), 11),
+ Row.of(Vectors.dense(1, 0), 10),
+ Row.of(Vectors.dense(1, 1.), 10));
+
+Table trainTable = tEnv.fromDataStream(env.fromCollection(trainData)).as("features", "label");
+
+List<Row> predictData =
+ Arrays.asList(
+ Row.of(Vectors.dense(0, 1.)),
+ Row.of(Vectors.dense(0, 0.)),
+ Row.of(Vectors.dense(1, 0)),
+ Row.of(Vectors.dense(1, 1.)));
+
+Table predictTable = tEnv.fromDataStream(env.fromCollection(predictData)).as("features");
+
+NaiveBayes estimator =
+ new NaiveBayes()
+ .setSmoothing(1.0)
+ .setFeaturesCol("features")
+ .setLabelCol("label")
+ .setPredictionCol("prediction")
+ .setModelType("multinomial");
+
+NaiveBayesModel model = estimator.fit(trainTable);
+Table outputTable = model.transform(predictTable)[0];
+
+outputTable.execute().print();
+```
+
+
+
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/clustering/_index.md
index 86f8a7e..c7b5b91 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/clustering/_index.md
@@ -1,7 +1,7 @@
---
title: Clustering
bookCollapseSection: true
-weight: 1
+weight: 3
aliases:
- /operators/clustering/
---
diff --git a/docs/content/docs/operators/clustering/kmeans.md b/docs/content/docs/operators/clustering/kmeans.md
index e8f3f20..221f77d 100644
--- a/docs/content/docs/operators/clustering/kmeans.md
+++ b/docs/content/docs/operators/clustering/kmeans.md
@@ -1,6 +1,5 @@
---
title: "Kmeans"
-weight: 1
type: docs
aliases:
- /operators/clustering/kmeans.html
@@ -31,19 +30,19 @@ into a predefined number of clusters.
## Input Columns
-| Param name | Type | Default | Description |
-| :---------- | :------ | :--------- | :------------- |
-| featuresCol | Vector | "features" | Feature vector |
+| Param name | Type | Default | Description |
+| :---------- | :----- | :----------- | :------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
## Output Columns
-| Param name | Type | Default | Description |
-| :------------ | :------ | :----------- | :----------------------- |
-| predictionCol | Integer | "prediction" | Predicted cluster center |
+| Param name | Type | Default | Description |
+| :------------ | :------ | :------------- | :----------------------- |
+| predictionCol | Integer | `"prediction"` | Predicted cluster center |
## Parameters
-Below are parameters required by `KmeansModel`.
+Below are parameters required by `KMeansModel`.
| Key | Default | Type | Required | Description |
| --------------- | ------------------------------- | ------ | -------- | ------------------------------------------------------------ |
@@ -51,7 +50,7 @@ Below are parameters required by `KmeansModel`.
| featuresCol | `"features"` | String | no | Features column name. |
| predictionCol | `"prediction"` | String | no | Prediction column name. |
-`Kmeans` need parameters above and also below.
+`KMeans` needs parameters above and also below.
| Key | Default | Type | Required | Description |
| -------- | ---------- | ------- | -------- | ---------------------------------------------------------- |
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/feature/_index.md
similarity index 94%
copy from docs/content/docs/operators/clustering/_index.md
copy to docs/content/docs/operators/feature/_index.md
index 86f8a7e..a87ec23 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/feature/_index.md
@@ -1,9 +1,9 @@
---
-title: Clustering
+title: Feature Engineering
bookCollapseSection: true
weight: 1
aliases:
- - /operators/clustering/
+ - /operators/feature/
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/feature/onehotencoder.md b/docs/content/docs/operators/feature/onehotencoder.md
new file mode 100644
index 0000000..67aa086
--- /dev/null
+++ b/docs/content/docs/operators/feature/onehotencoder.md
@@ -0,0 +1,83 @@
+---
+title: "One Hot Encoder"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/onehotencoder.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# One Hot Encoder
+
+One-hot encoding maps a categorical feature, represented as a label index, to a
+binary vector with at most a single one-value indicating the presence of a
+specific feature value from among the set of all feature values. This encoding
+allows algorithms which expect continuous features, such as Logistic Regression,
+to use categorical features.
+
+OneHotEncoder can transform multiple columns, returning an one-hot-encoded
+output vector column for each input column.
+
+## Input Columns
+
+| Param name | Type | Default | Description |
+| :--------- | :------ | :------ | :---------- |
+| inputCols | Integer | `null` | Label index |
+
+## Output Columns
+
+| Param name | Type | Default | Description |
+| :--------- | :----- | :------ | :-------------------- |
+| outputCols | Vector | `null` | Encoded binary vector |
+
+## Parameters
+
+| Key | Default | Type | Required | Description |
+| ------------- | -------------------------------- | ------- | -------- | ------------------------------------------------------------ |
+| inputCols | `null` | String | yes | Input column names. |
+| outputCols | `null` | String | yes | Output column names. |
+| handleInvalid | `HasHandleInvalid.ERROR_INVALID` | String | No | Strategy to handle invalid entries. Supported values: `HasHandleInvalid.ERROR_INVALID`, `HasHandleInvalid.SKIP_INVALID` |
+| dropLast | `true` | Boolean | no | Whether to drop the last category. |
+
+## Examples
+
+```java
+import org.apache.flink.ml.feature.onehotencoder.OneHotEncoder;
+import org.apache.flink.ml.feature.onehotencoder.OneHotEncoderModel;
+
+List<Row> trainData = Arrays.asList(Row.of(0.0), Row.of(1.0), Row.of(2.0), Row.of(0.0));
+Table trainTable = tEnv.fromDataStream(env.fromCollection(trainData)).as("input");
+
+List<Row> predictData = Arrays.asList(Row.of(0.0), Row.of(1.0), Row.of(2.0));
+Table predictTable = tEnv.fromDataStream(env.fromCollection(predictData)).as("input");
+
+OneHotEncoder estimator = new OneHotEncoder().setInputCols("input").setOutputCols("output");
+OneHotEncoderModel model = estimator.fit(trainTable);
+Table outputTable = model.transform(predictTable)[0];
+
+outputTable.execute().print();
+```
+
+
+
+
+
+
+
diff --git a/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java b/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java
index 894098f..c39a774 100644
--- a/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java
+++ b/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java
@@ -41,8 +41,8 @@ import static org.apache.flink.ml.builder.GraphNode.StageType;
/**
* A GraphModel acts as a Model. A GraphModel consists of a DAG of stages, each of which could be an
- * Estimator, Model, Transformer or AlgoOperators. When `GraphModel::transform` is called, the
- * stages are executed in a topologically-sorted order. When a stage is executed, its
+ * Estimator, Model, Transformer or AlgoOperator. When `GraphModel::transform` is called, the stages
+ * are executed in a topologically-sorted order. When a stage is executed, its
* `AlgoOperator::transform` method will be called on the input tables (from the input edges) and
* produce output tables to the output edges.
*/