You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by li...@apache.org on 2022/02/18 09:55:22 UTC

[flink-ml] branch master updated: [FLINK-26100][docs] Add doc for ops & key concepts (#62)

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-ml.git


The following commit(s) were added to refs/heads/master by this push:
     new e438515  [FLINK-26100][docs] Add doc for ops & key concepts (#62)
e438515 is described below

commit e4385153e1d9914d598e3007a11ad78fb29b107f
Author: yunfengzhou-hub <yu...@outlook.com>
AuthorDate: Fri Feb 18 17:55:04 2022 +0800

    [FLINK-26100][docs] Add doc for ops & key concepts (#62)
    
    This closes #62.
---
 .../docs/{operators => development}/_index.md      |   5 +-
 docs/content/docs/development/iteration.md         | 225 +++++++++++++++++++
 docs/content/docs/development/overview.md          | 246 +++++++++++++++++++++
 docs/content/docs/development/types.md             |  49 ++++
 docs/content/docs/operators/_index.md              |   2 +-
 .../{clustering => classification}/_index.md       |   6 +-
 docs/content/docs/operators/classification/knn.md  | 116 ++++++++++
 .../operators/classification/logisticregression.md | 108 +++++++++
 .../docs/operators/classification/naivebayes.md    | 100 +++++++++
 docs/content/docs/operators/clustering/_index.md   |   2 +-
 docs/content/docs/operators/clustering/kmeans.md   |  17 +-
 .../operators/{clustering => feature}/_index.md    |   4 +-
 .../docs/operators/feature/onehotencoder.md        |  83 +++++++
 .../org/apache/flink/ml/builder/GraphModel.java    |   4 +-
 14 files changed, 947 insertions(+), 20 deletions(-)

diff --git a/docs/content/docs/operators/_index.md b/docs/content/docs/development/_index.md
similarity index 88%
copy from docs/content/docs/operators/_index.md
copy to docs/content/docs/development/_index.md
index c45aa20..3229296 100644
--- a/docs/content/docs/operators/_index.md
+++ b/docs/content/docs/development/_index.md
@@ -1,7 +1,8 @@
 ---
-title: Operators
-icon: <i class="fa fa-book title maindish" aria-hidden="true"></i>
+title: Development
+icon: <i class="fa fa-code title maindish" aria-hidden="true"></i>
 bold: true
+sectionBreak: true
 bookCollapseSection: true
 weight: 2
 ---
diff --git a/docs/content/docs/development/iteration.md b/docs/content/docs/development/iteration.md
new file mode 100644
index 0000000..6e656be
--- /dev/null
+++ b/docs/content/docs/development/iteration.md
@@ -0,0 +1,225 @@
+---
+title: "Iteration"
+weight: 2
+type: docs
+aliases:
+- /development/iteration.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Iteration
+
+Iteration is a basic building block for a ML library. In machine learning
+algorithms, iteration might be used in offline or online training process. In
+general, two types of iterations are required and Flink ML supports both of them
+in order to provide the infrastructure for a variety of algorithms.
+
+1. **Bounded Iteration**: Usually used in the offline case. In this case the
+   algorithm usually trains on a bounded dataset, it updates the parameters for
+   multiple rounds until convergence.
+2. **Unbounded Iteration**: Usually used in the online case, in this case the
+   algorithm usually trains on an unbounded dataset. It accumulates a mini-batch
+   of data and then do one update to the parameters. 
+
+## Iteration Paradigm
+
+An iterative algorithm has the following behavior pattern:
+
+- The iterative algorithm has an ***iteration body*** that is repeatedly invoked
+  until some termination criteria is reached (e.g. after a user-specified number
+  of epochs has been reached). An iteration body is a subgraph of operators that
+  implements the computation logic of e.g. an iterative machine learning
+  algorithm, whose outputs might be fed back as the inputs of this subgraph. 
+- In each invocation, the iteration body updates the model parameters based on
+  the user-provided data as well as the most recent model parameters.
+- The iterative algorithm takes as inputs the user-provided data and the initial
+  model parameters.
+- The iterative algorithm could output arbitrary user-defined information, such
+  as the loss after each epoch, or the final model parameters. 
+
+Therefore, the behavior of an iterative algorithm could be characterized with
+the following iteration paradigm (w.r.t. Flink concepts):
+
+- An iteration-body is a Flink subgraph with the following inputs and outputs:
+  - Inputs: **model-variables** (as a list of data streams) and
+    **user-provided-data** (as another list of data streams)
+  - Outputs: **feedback-model-variables** (as a list of data streams) and
+    **user-observed-outputs** (as a list of data streams)
+- A **termination-condition** that specifies when the iterative execution of the
+  iteration body should terminate.
+- In order to execute an iteration body, a user needs to execute the iteration
+  body with the following inputs, and gets the following outputs.
+  - Inputs: **initial-model-variables** (as a list of bounded data streams) and
+    **user-provided-data** (as a list of data streams)
+  - Outputs: the **user-observed-output** emitted by the iteration body.
+
+It is important to note that the **model-variables** expected by the iteration
+body is not the same as the **initial-model-variables** provided by the user.
+Instead, **model-variables** are computed as the union of the
+**feedback-model-variables** (emitted by the iteration body) and the
+**initial-model-variables** (provided by the caller of the iteration body).
+Flink ML provides utility class (see Iterations) to run an iteration-body with
+the user-provided inputs.
+
+The figure below summarizes the iteration paradigm described above. 
+
+{{<  mermaid >}}
+flowchart LR
+
+subgraph Iteration Body
+union1
+union2
+node11
+node12
+node21
+node22
+nodeX
+end
+
+input0 --> node11
+
+union1 -. feedback .-  node12
+input1 --> union1
+union1 --> node11
+node11 --> nodeX
+nodeX --> node12
+node12 --> output1
+
+input2 --> union2
+union2 --> node21
+node21 --> nodeX
+nodeX --> node22
+node22 --> output2
+union2 -. feedback .-  node22
+
+input0[non-iterate input]
+input1[iterate input]
+input2[iterate input]
+union1[union]
+union2[union]
+node11( )
+node12( )
+nodeX( )
+node21( )
+node22( )
+output1[output]
+output2[output]
+
+{{<  /mermaid >}}
+
+## API
+
+The main entry of Flink ML's iteration lies in `Iterations` class. It mainly
+provides two public methods and users may choose to use either of them based on
+whether the input data is bounded or unbounded.
+
+```java
+public class Iterations {
+  public static DataStreamList iterateUnboundedStreams(
+    DataStreamList initVariableStreams, DataStreamList dataStreams, IterationBody body) {...}
+  ...
+  public static DataStreamList iterateBoundedStreamsUntilTermination(
+    DataStreamList initVariableStreams,
+    ReplayableDataStreamList dataStreams,
+    IterationConfig config,
+    IterationBody body){...}
+}
+```
+
+To construct an iteration, Users are required to provide
+
+- `initVariableStreams`: the initial values of the variable data streams which
+  would be updated in each round.
+- `dataStreams`: the other data streams used inside the iteration, but would not
+  be updated.
+- `iterationBody`: specifies the subgraph to update the variable streams and the
+  outputs.
+
+The `IterationBody` will be invoked with two parameters: The first parameter is
+a list of input variable streams, which are created as the union of the initial
+variable streams and the corresponding feedback variable streams (returned by
+the iteration body); The second parameter is the data streams given to this
+method. 
+
+```java
+public interface IterationBody extends Serializable {
+  ...
+  IterationBodyResult process(DataStreamList variableStreams, DataStreamList dataStreams);
+  ...
+}
+```
+
+During the execution of iteration body, each of the records involved in the
+iteration has an epoch attached, which marks the progress of the iteration. The
+epoch is computed as:
+
+- All records in the initial variable streams and initial data streams has epoch
+  = 0.
+- For any record emitted by this operator into a non-feedback stream, the epoch
+  of this emitted record = the epoch of the input record that triggers this
+  emission. If this record is emitted by onEpochWatermarkIncremented(), then the
+  epoch of this record = epochWatermark.
+- For any record emitted by this operator into a feedback variable stream, the
+  epoch of the emitted record = the epoch of the input record that triggers this
+  emission + 1.
+
+The framework would deliver notification at the end of each epoch to operators
+and UDFs that implements `IterationListener`.
+
+```java
+public interface IterationListener<T> {
+  void onEpochWatermarkIncremented(int epochWatermark, Context context, Collector<T> collector)
+    throws Exception;
+  ...
+  void onIterationTerminated(Context context, Collector<T> collector) throws Exception;
+}
+```
+
+## Example Usage
+
+Example codes of utilizing iterations is as below。
+
+```java
+DataStream<double[]> initParameters = ... 
+DataStream<Tuple2<double[], Double>> dataset = ...
+
+DataStreamList resultStreams = Iterations.iterateUnboundedStreams(
+	DataStreamList.of(initParameters),
+  ReplayableDataStreamList.notReplay(dataset),
+  IterationConfig.newBuilder().setOperatorRoundMode(ALL_ROUND).build();
+  (variableStreams, dataStreams) -> {
+    DataStream<double[]> modelUpdate = variableStreams.get(0); 
+    DataStream<Tuple2<double[], Double>> dataset = dataStreams.get(0);
+    DataStream<double[]> newModelUpdate = ... 
+    DataStream<double[]> modelOutput = ... 
+    return new IterationBodyResult(
+      DataStreamList.of(newModelUpdate), 
+      DataStreamList.of(modelOutput)
+});
+
+DataStream<double[]> finalModel = resultStreams.get("final_model");
+```
+
+- `initParameters`: input data that needs to be transmitted through feedback
+  edge.
+- `dataset`: input data that does not need to be tarnsmitted through feed back
+  edge.
+- `newModelUpdate`: data to be transmitted through feedback edge
+- `modelOutput`: final output of the iteration body
diff --git a/docs/content/docs/development/overview.md b/docs/content/docs/development/overview.md
new file mode 100644
index 0000000..6075e23
--- /dev/null
+++ b/docs/content/docs/development/overview.md
@@ -0,0 +1,246 @@
+---
+title: "Overview"
+weight: 1
+type: docs
+aliases:
+- /development/overview.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Overview
+
+This document provides a brief introduction to the basic concepts in Flink ML. 
+
+## Table API
+
+Flink ML's API is based on Flink's Table API. The Table API is a
+language-integrated query API for Java, Scala, and Python that allows the
+composition of queries from relational operators such as selection, filter, and
+join in a very intuitive way.
+
+Table API allows the usage of a wide range of data types. [Flink Document Data
+Types](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/types/)
+page provides a list of supported types. In addition to these types, Flink ML
+also provides support for `Vector` Type.
+
+The Table API integrates seamlessly with Flink’s DataStream API. You can easily
+switch between all APIs and libraries which build upon them. Please refer to
+Flink's document for how to convert between `Table` and `DataStream`, as well as
+other usage of Flink Table API.
+
+## Stage
+
+A `Stage` is a node in a `Pipeline` or `Graph`. It is the fundamental component
+in Flink ML. This interface is only a concept, and does not have any actual
+functionality. Its subclasses include the follows.
+
+- `Estimator`: An `Estimator` is a `Stage` that is reponsible for the training
+  process in machine learning algorithms. It implements a `fit()` method that
+  takes a list of tables and produces a `Model`.
+
+- `AlgoOperator`: An `AlgoOperator` is a `Stage` that is used to encode generic
+  multi-input multi-output computation logic. It implements a `transform()`
+  method, which applies certain computation logic on the given input tables and
+  returns a list of result tables.
+
+- `Transformer`: A `Transformer` is an `AlgoOperator` with the semantic
+  difference that it encodes the Transformation logic, such that a record in the
+  output typically corresponds to one record in the input. In contrast, an
+  `AlgoOperator` is a better fit to express aggregation logic where a record in
+  the output could be computed from an arbitrary number of records in the input.
+
+- `Model`: A `Model` is a `Transformer` with the extra APIs to set and get model
+  data. It is typically generated by fitting an `Estimator` on a list of tables.
+  It provides `getModelData()` and `setModelData()`, which allows users to
+  explicitly read or write model data tables to the transformer. Each table
+  could be an unbounded stream of model data changes.
+
+A typical usage of `Stage` is to create an `Estimator` instance first, trigger
+its training process by invoking its `fit()` method, and to perform predictions
+with the resulting `Model` instance. This example usage is shown in the code
+below.
+
+```java
+// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
+
+Table trainData = ...;
+Table predictData = ...;
+
+SumEstimator estimator = new SumEstimator();
+SumModel model = estimator.fit(trainData);
+Table predictResult = model.transform(predictData)[0];
+```
+
+## Builders
+
+In order to organize Flink ML stages into more complexed format so as to achieve
+advanced functionalities, like chaining data processing and machine learning
+algorithms together, Flink ML provides APIs that help to manage the relationship
+and structure of stages in Flink jobs. The entry of these APIs includes
+`Pipeline` and `Graph`.
+
+### Pipeline
+
+A `Pipeline` acts as an `Estimator`. It consists of an ordered list of stages,
+each of which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`.
+Its `fit()` method goes through all stages of this pipeline in order and does
+the following on each stage until the last `Estimator` (inclusive).
+
+- If a stage is an `Estimator`, it would invoke the stage's `fit()` method with
+  the input tables to generage a `Model`. And if there is `Estimator` after this
+  stage, it would transform the input tables using the generated `Model` to get
+  result tables, then pass the result tables to the next stage as inputs.
+- If a stage is an `AlgoOperator` AND there is `Estimator` after this stage, it
+  would transform the input tables using this stage to get result tables, then
+  pass the result tables to the next stage as inputs.
+
+After all the `Estimators` are trained to fit their input tables, a new
+`PipelineModel` will be created with the same stages in this pipeline, except
+that all the `Estimator`s in the `PipelineModel` are replaced with the models
+generated in the above process.
+
+A `PipelineModel` acts as a `Model`. It consists of an ordered list of stages,
+each of which could be a `Model`, `Transformer` or `AlgoOperator`. Its
+`transform()` method applies all stages in this `PipelineModel` on the input
+tables in order. The output of one stage is used as the input of the next stage
+(if any). The output of the last stage is returned as the result of this method.
+
+A `Pipeline` can be created by passing a list of `Stage`s to Pipeline's
+constructor. For example,
+
+```java
+// Suppose SumModel is a concrete subclass of Model, SumEstimator is a concrete subclass of Estimator.
+
+Model modelA = new SumModel().setModelData(tEnv.fromValues(10));
+Estimator estimatorA = new SumEstimator();
+Model modelB = new SumModel().setModelData(tEnv.fromValues(30));
+
+List<Stage<?>> stages = Arrays.asList(modelA, estimatorA, modelB);
+Estimator<?, ?> estimator = new Pipeline(stages);
+```
+
+The commands above creates a Pipeline like follows.
+
+{{< mermaid >}}
+
+graph LR
+
+empty0[ ] --> modelA --> estimatorA --> modelB --> empty1[ ]
+
+style empty0 fill:#FFFFFF, stroke:#FFFFFF;
+style empty1 fill:#FFFFFF, stroke:#FFFFFF;
+
+{{< /mermaid >}}
+
+### Graph
+
+A `Graph` acts as an `Estimator`. A `Graph` consists of a DAG of stages, each of
+which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`. When
+`Graph::fit` is called, the stages are executed in a topologically-sorted order.
+If a stage is an `Estimator`, its `Estimator::fit` method will be called on the
+input tables (from the input edges) to fit a `Model`. Then the `Model` will be
+used to transform the input tables and produce output tables to the output
+edges. If a stage is an `AlgoOperator`, its `AlgoOperator::transform` method
+will be called on the input tables and produce output tables to the output
+edges. The `GraphModel` fitted from a `Graph` consists of the fitted `Models`
+and `AlgoOperators`, corresponding to the `Graph`'s stages.
+
+A `GraphModel` acts as a `Model`. A `GraphModel` consists of a DAG of stages,
+each of which could be an `Estimator`, `Model`, `Transformer` or `AlgoOperator`.
+When `GraphModel::transform` is called, the stages are executed in a
+topologically-sorted order. When a stage is executed, its
+`AlgoOperator::transform` method will be called on the input tables (from the
+input edges) and produce output tables to the output edges.
+
+A `Graph` can be constructed via the `GraphBuilder` class, which provides
+methods like `addAlgoOperator` or `addEstimator` to help adding stages to a
+graph. Flink ML also introduces `TableId` class to represent the input/output of
+a stage and to help express the relationship between stages in a graph, thus
+allowing users to construct the DAG before they have the concrete tables
+available.
+
+The example codes below shows how to build a `Graph`.
+
+```java
+// Suppose SumModel is a concrete subclass of Model.
+
+GraphBuilder builder = new GraphBuilder();
+// Creates nodes.
+SumModel stage1 = new SumModel().setModelData(tEnv.fromValues(1));
+SumModel stage2 = new SumModel();
+SumModel stage3 = new SumModel().setModelData(tEnv.fromValues(3));
+// Creates inputs and modelDataInputs.
+TableId input = builder.createTableId();
+TableId modelDataInput = builder.createTableId();
+// Feeds inputs and gets outputs.
+TableId output1 = builder.addAlgoOperator(stage1, input)[0];
+TableId output2 = builder.addAlgoOperator(stage2, output1)[0];
+builder.setModelDataOnModel(stage2, modelDataInput);
+TableId output3 = builder.addAlgoOperator(stage3, output2)[0];
+TableId modelDataOutput = builder.getModelDataFromModel(stage3)[0];
+
+// Builds a Model from the graph.
+TableId[] inputs = new TableId[] {input};
+TableId[] outputs = new TableId[] {output3};
+TableId[] modelDataInputs = new TableId[] {modelDataInput};
+TableId[] modelDataOutputs = new TableId[] {modelDataOutput};
+Model<?> model = builder.buildModel(inputs, outputs, modelDataInputs, modelDataOutputs);
+```
+
+The code above constructs a `Graph` like follows.
+
+{{< mermaid >}}
+
+graph LR
+
+empty0[ ] --> |input| stage1
+stage1 --> |output1| stage2
+empty1[ ] --> |modelDataInput| stage2
+stage2 --> |output2| stage3
+stage3 --> |output3| empty3[ ]
+stage3 --> |modelDataOutput| empty2[ ]
+
+style empty0 fill:#FFFFFF, stroke:#FFFFFF;
+style empty1 fill:#FFFFFF, stroke:#FFFFFF;
+style empty2 fill:#FFFFFF, stroke:#FFFFFF;
+style empty3 fill:#FFFFFF, stroke:#FFFFFF;
+
+{{< /mermaid >}}
+
+## Parameter
+
+Flink ML `Stage` is a subclass of `WithParams`, which provides a uniform API to
+get and set parameters.
+
+A `Param` is the definition of a parameter, including name, class, description,
+default value and the validator.
+
+In order to set the parameter of an algorithm, users can use any of the
+following ways.
+
+- Invoke the parameter's specific set method. For example, in order to set `K`,
+  the number of clusters, of a K-means algorithm, users can directly invoke
+  `setK()` method on that `KMeans` instance.
+- Pass a parameter map containing new values to the stage through
+  `ReadWriteUtils.updateExistingParams()` method.
+
+If a `Model` is generated through an `Estimator`'s `fit()` method, the `Model`
+would inherit the `Estimator` object's parameters. Thus there is no need to set
+the parameters for a second time if the parameters are not changed.
diff --git a/docs/content/docs/development/types.md b/docs/content/docs/development/types.md
new file mode 100644
index 0000000..2ffcf0e
--- /dev/null
+++ b/docs/content/docs/development/types.md
@@ -0,0 +1,49 @@
+---
+title: "Data Types"
+weight: 3
+type: docs
+aliases:
+- /development/types.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Data Types
+
+Flink ML supports all data types that have been supported by Flink Table API, as
+well as data types listed in sections below.
+
+## Vector
+
+Flink ML provides support for vectors of double values. A `Vector` in Flink ML
+can be either dense(`DenseVector`) or sparse(`SparseVector`), depending on how
+users create them accordig to the vector's sparsity. Each vector is initialized
+with a fixed size and users may get or set the double value of any 0-based index
+location in the vector.
+
+Flink ML also has a class named `Vectors` providing utility methods for
+instantiating vectors.
+
+```java
+int n = 4;
+int[] indices = new int[] {0, 2, 3};
+double[] values = new double[] {0.1, 0.3, 0.4};
+
+SparseVector vector = Vectors.sparse(n, indices, values);
+```
diff --git a/docs/content/docs/operators/_index.md b/docs/content/docs/operators/_index.md
index c45aa20..5038fcf 100644
--- a/docs/content/docs/operators/_index.md
+++ b/docs/content/docs/operators/_index.md
@@ -3,7 +3,7 @@ title: Operators
 icon: <i class="fa fa-book title maindish" aria-hidden="true"></i>
 bold: true
 bookCollapseSection: true
-weight: 2
+weight: 3
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/classification/_index.md
similarity index 93%
copy from docs/content/docs/operators/clustering/_index.md
copy to docs/content/docs/operators/classification/_index.md
index 86f8a7e..e6f29c5 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/classification/_index.md
@@ -1,9 +1,9 @@
 ---
-title: Clustering
+title: Classification
 bookCollapseSection: true
-weight: 1
+weight: 2
 aliases:
-  - /operators/clustering/
+  - /operators/feature/
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/classification/knn.md b/docs/content/docs/operators/classification/knn.md
new file mode 100644
index 0000000..4c67c56
--- /dev/null
+++ b/docs/content/docs/operators/classification/knn.md
@@ -0,0 +1,116 @@
+---
+title: "KNN"
+type: docs
+aliases:
+- /operators/classification/knn.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# KNN
+
+K Nearest Neighbor(KNN) is a classification algorithm. The basic assumption of
+KNN is that if most of the nearest K neighbors of the provided sample belongs to
+the same label, then it is highly probabl that the provided sample also belongs
+to that label.
+
+## Input Columns
+
+| Param name  | Type    | Default      | Description      |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector  | `"features"` | Feature vector   |
+| labelCol    | Integer | `"label"`    | Label to predict |
+
+## Output Columns
+
+| Param name    | Type    | Default        | Description     |
+| :------------ | :------ | :------------- | :-------------- |
+| predictionCol | Integer | `"prediction"` | Predicted label |
+
+## Parameters
+
+Below are parameters required by `KnnModel`.
+
+| Key           | Default        | Type    | Required | Description                      |
+| ------------- | -------------- | ------- | -------- | -------------------------------- |
+| K             | `5`            | Integer | no       | The number of nearest neighbors. |
+| featuresCol   | `"features"`   | String  | no       | Features column name.            |
+| predictionCol | `"prediction"` | String  | no       | Prediction column name.          |
+
+`Knn` needs parameters above and also below.
+
+| Key      | Default   | Type   | Required | Description        |
+| -------- | --------- | ------ | -------- | ------------------ |
+| labelCol | `"label"` | String | no       | Label column name. |
+
+## Examples
+
+```java
+import org.apache.flink.ml.classification.knn.Knn;
+import org.apache.flink.ml.classification.knn.KnnModel;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+
+List<Row> trainRows =
+  new ArrayList<>(
+  Arrays.asList(
+    Row.of(Vectors.dense(2.0, 3.0), 1.0),
+    Row.of(Vectors.dense(2.1, 3.1), 1.0),
+    Row.of(Vectors.dense(200.1, 300.1), 2.0),
+    Row.of(Vectors.dense(200.2, 300.2), 2.0),
+    Row.of(Vectors.dense(200.3, 300.3), 2.0),
+    Row.of(Vectors.dense(200.4, 300.4), 2.0),
+    Row.of(Vectors.dense(200.4, 300.4), 2.0),
+    Row.of(Vectors.dense(200.6, 300.6), 2.0),
+    Row.of(Vectors.dense(2.1, 3.1), 1.0),
+    Row.of(Vectors.dense(2.1, 3.1), 1.0),
+    Row.of(Vectors.dense(2.1, 3.1), 1.0),
+    Row.of(Vectors.dense(2.1, 3.1), 1.0),
+    Row.of(Vectors.dense(2.3, 3.2), 1.0),
+    Row.of(Vectors.dense(2.3, 3.2), 1.0),
+    Row.of(Vectors.dense(2.8, 3.2), 3.0),
+    Row.of(Vectors.dense(300., 3.2), 4.0),
+    Row.of(Vectors.dense(2.2, 3.2), 1.0),
+    Row.of(Vectors.dense(2.4, 3.2), 5.0),
+    Row.of(Vectors.dense(2.5, 3.2), 5.0),
+    Row.of(Vectors.dense(2.5, 3.2), 5.0),
+    Row.of(Vectors.dense(2.1, 3.1), 1.0)));
+List<Row> predictRows =
+  new ArrayList<>(
+  Arrays.asList(
+    Row.of(Vectors.dense(4.0, 4.1), 5.0),
+    Row.of(Vectors.dense(300, 42), 2.0)));
+Schema schema =
+  Schema.newBuilder()
+  .column("f0", DataTypes.of(DenseVector.class))
+  .column("f1", DataTypes.DOUBLE())
+  .build();
+
+DataStream<Row> dataStream = env.fromCollection(trainRows);
+Table trainData = tEnv.fromDataStream(dataStream, schema).as("features", "label");
+DataStream<Row> predDataStream = env.fromCollection(predictRows);
+Table predictData = tEnv.fromDataStream(predDataStream, schema).as("features", "label");
+
+Knn knn = new Knn();
+KnnModel knnModel = knn.fit(trainData);
+Table output = knnModel.transform(predictData)[0];
+
+output.execute().print();
+```
+
diff --git a/docs/content/docs/operators/classification/logisticregression.md b/docs/content/docs/operators/classification/logisticregression.md
new file mode 100644
index 0000000..3727642
--- /dev/null
+++ b/docs/content/docs/operators/classification/logisticregression.md
@@ -0,0 +1,108 @@
+---
+title: "Logistic Regression"
+type: docs
+aliases:
+- /operators/classification/logisticregression.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Logistic Regression
+
+Logistic regression is a special case of Generalized Linear Model. It is widely
+used to predict a binary response. 
+
+## Input Columns
+
+| Param name  | Type    | Default      | Description      |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector  | `"features"` | Feature vector   |
+| labelCol    | Integer | `"label"`    | Label to predict |
+| weightCol   | Double  | `"weight"`   | Weight of sample |
+
+## Output Columns
+
+| Param name       | Type    | Default           | Description                             |
+| :--------------- | :------ | :---------------- | :-------------------------------------- |
+| predictionCol    | Integer | `"prediction"`    | Label of the max probability            |
+| rawPredictionCol | Vector  | `"rawPrediction"` | Vector of the probability of each label |
+
+## Parameters
+
+Below are parameters required by `LogisticRegressionModel`.
+
+| Key              | Default           | Type   | Required | Description                 |
+| ---------------- | ----------------- | ------ | -------- | --------------------------- |
+| featuresCol      | `"features"`      | String | no       | Features column name.       |
+| predictionCol    | `"prediction"`    | String | no       | Prediction column name.     |
+| rawPredictionCol | `"rawPrediction"` | String | no       | Raw prediction column name. |
+
+`LogisticRegression` needs parameters above and also below.
+
+| Key             | Default   | Type    | Required | Description                                                  |
+| --------------- | --------- | ------- | -------- | ------------------------------------------------------------ |
+| labelCol        | `"label"` | String  | no       | Label column name.                                           |
+| weightCol       | `null`    | String  | no       | Weight column name.                                          |
+| maxIter         | `20`      | Integer | no       | Maximum number of iterations.                                |
+| reg             | `0.`      | Double  | no       | Regularization parameter.                                    |
+| learningRate    | `0.1`     | Double  | no       | Learning rate of optimization method.                        |
+| globalBatchSize | `32`      | Integer | no       | Global batch size of training algorithms.                    |
+| tol             | `1e-6`    | Double  | no       | Convergence tolerance for iterative algorithms.              |
+| multiClass      | `"auto"`  | String  | no       | Classification type. Supported values: "auto", "binomial", "multinomial" |
+
+## Examples
+
+```java
+import org.apache.flink.ml.classification.logisticregression.LogisticRegression;
+import org.apache.flink.ml.linalg.DenseVector;
+import org.apache.flink.ml.linalg.Vectors;
+
+List<Row> binomialTrainData =
+  Arrays.asList(
+  Row.of(Vectors.dense(1, 2, 3, 4), 0., 1.),
+  Row.of(Vectors.dense(2, 2, 3, 4), 0., 2.),
+  Row.of(Vectors.dense(3, 2, 3, 4), 0., 3.),
+  Row.of(Vectors.dense(4, 2, 3, 4), 0., 4.),
+  Row.of(Vectors.dense(5, 2, 3, 4), 0., 5.),
+  Row.of(Vectors.dense(11, 2, 3, 4), 1., 1.),
+  Row.of(Vectors.dense(12, 2, 3, 4), 1., 2.),
+  Row.of(Vectors.dense(13, 2, 3, 4), 1., 3.),
+  Row.of(Vectors.dense(14, 2, 3, 4), 1., 4.),
+  Row.of(Vectors.dense(15, 2, 3, 4), 1., 5.));
+Collections.shuffle(binomialTrainData);
+
+Table binomialDataTable =
+  tEnv.fromDataStream(
+  env.fromCollection(
+    binomialTrainData,
+    new RowTypeInfo(
+      new TypeInformation[] {
+        TypeInformation.of(DenseVector.class),
+        Types.DOUBLE,
+        Types.DOUBLE
+      },
+      new String[] {"features", "label", "weight"})));
+
+LogisticRegression logisticRegression = new LogisticRegression().setWeightCol("weight");
+LogisticRegressionModel model = logisticRegression.fit(binomialDataTable);
+Table output = model.transform(binomialDataTable)[0];
+
+output.execute().print();
+```
+
diff --git a/docs/content/docs/operators/classification/naivebayes.md b/docs/content/docs/operators/classification/naivebayes.md
new file mode 100644
index 0000000..73a1867
--- /dev/null
+++ b/docs/content/docs/operators/classification/naivebayes.md
@@ -0,0 +1,100 @@
+---
+title: "Naive Bayes"
+type: docs
+aliases:
+- /operators/classification/naivebayes.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Naive Bayes
+
+Naive Bayes is a multiclass classifier. Based on Bayes’ theorem, it assumes that
+there is strong (naive) independence between every pair of features. 
+
+## Input Columns
+
+| Param name  | Type    | Default      | Description      |
+| :---------- | :------ | :----------- | :--------------- |
+| featuresCol | Vector  | `"features"` | Feature vector   |
+| labelCol    | Integer | `"label"`    | Label to predict |
+
+## Output Columns
+
+| Param name    | Type    | Default        | Description     |
+| :------------ | :------ | :------------- | :-------------- |
+| predictionCol | Integer | `"prediction"` | Predicted label |
+
+## Parameters
+
+Below are parameters required by `NaiveBayesModel`.
+
+| Key           | Default         | Type   | Required | Description                                     |
+| ------------- | --------------- | ------ | -------- | ----------------------------------------------- |
+| modelType     | `"multinomial"` | String | no       | The model type. Supported values: "multinomial" |
+| featuresCol   | `"features"`    | String | no       | Features column name.                           |
+| predictionCol | `"prediction"`  | String | no       | Prediction column name.                         |
+
+`NaiveBayes` needs parameters above and also below.
+
+| Key       | Default   | Type   | Required | Description              |
+| --------- | --------- | ------ | -------- | ------------------------ |
+| labelCol  | `"label"` | String | no       | Label column name.       |
+| smoothing | `1.0`     | Double | no       | The smoothing parameter. |
+
+## Examples
+
+```java
+import org.apache.flink.ml.classification.naivebayes.NaiveBayes;
+import org.apache.flink.ml.classification.naivebayes.NaiveBayesModel;
+import org.apache.flink.ml.linalg.Vectors;
+
+List<Row> trainData =
+  Arrays.asList(
+  Row.of(Vectors.dense(0, 0.), 11),
+  Row.of(Vectors.dense(1, 0), 10),
+  Row.of(Vectors.dense(1, 1.), 10));
+
+Table trainTable = tEnv.fromDataStream(env.fromCollection(trainData)).as("features", "label");
+
+List<Row> predictData =
+  Arrays.asList(
+  Row.of(Vectors.dense(0, 1.)),
+  Row.of(Vectors.dense(0, 0.)),
+  Row.of(Vectors.dense(1, 0)),
+  Row.of(Vectors.dense(1, 1.)));
+
+Table predictTable = tEnv.fromDataStream(env.fromCollection(predictData)).as("features");
+
+NaiveBayes estimator =
+  new NaiveBayes()
+  .setSmoothing(1.0)
+  .setFeaturesCol("features")
+  .setLabelCol("label")
+  .setPredictionCol("prediction")
+  .setModelType("multinomial");
+
+NaiveBayesModel model = estimator.fit(trainTable);
+Table outputTable = model.transform(predictTable)[0];
+
+outputTable.execute().print();
+```
+
+
+
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/clustering/_index.md
index 86f8a7e..c7b5b91 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/clustering/_index.md
@@ -1,7 +1,7 @@
 ---
 title: Clustering
 bookCollapseSection: true
-weight: 1
+weight: 3
 aliases:
   - /operators/clustering/
 ---
diff --git a/docs/content/docs/operators/clustering/kmeans.md b/docs/content/docs/operators/clustering/kmeans.md
index e8f3f20..221f77d 100644
--- a/docs/content/docs/operators/clustering/kmeans.md
+++ b/docs/content/docs/operators/clustering/kmeans.md
@@ -1,6 +1,5 @@
 ---
 title: "Kmeans"
-weight: 1
 type: docs
 aliases:
 - /operators/clustering/kmeans.html
@@ -31,19 +30,19 @@ into a predefined number of clusters.
 
 ## Input Columns
 
-| Param name  | Type    | Default    | Description    |
-| :---------- | :------ | :--------- | :------------- |
-| featuresCol | Vector  | "features" | Feature vector |
+| Param name  | Type   | Default      | Description    |
+| :---------- | :----- | :----------- | :------------- |
+| featuresCol | Vector | `"features"` | Feature vector |
 
 ## Output Columns
 
-| Param name    | Type    | Default      | Description              |
-| :------------ | :------ | :----------- | :----------------------- |
-| predictionCol | Integer | "prediction" | Predicted cluster center |
+| Param name    | Type    | Default        | Description              |
+| :------------ | :------ | :------------- | :----------------------- |
+| predictionCol | Integer | `"prediction"` | Predicted cluster center |
 
 ## Parameters
 
-Below are parameters required by `KmeansModel`.
+Below are parameters required by `KMeansModel`.
 
 | Key             | Default                         | Type   | Required | Description                                                  |
 | --------------- | ------------------------------- | ------ | -------- | ------------------------------------------------------------ |
@@ -51,7 +50,7 @@ Below are parameters required by `KmeansModel`.
 | featuresCol     | `"features"`                    | String | no       | Features column name.                                        |
 | predictionCol   | `"prediction"`                  | String | no       | Prediction column name.                                      |
 
-`Kmeans` need parameters above and also below.
+`KMeans` needs parameters above and also below.
 
 | Key      | Default    | Type    | Required | Description                                                |
 | -------- | ---------- | ------- | -------- | ---------------------------------------------------------- |
diff --git a/docs/content/docs/operators/clustering/_index.md b/docs/content/docs/operators/feature/_index.md
similarity index 94%
copy from docs/content/docs/operators/clustering/_index.md
copy to docs/content/docs/operators/feature/_index.md
index 86f8a7e..a87ec23 100644
--- a/docs/content/docs/operators/clustering/_index.md
+++ b/docs/content/docs/operators/feature/_index.md
@@ -1,9 +1,9 @@
 ---
-title: Clustering
+title: Feature Engineering
 bookCollapseSection: true
 weight: 1
 aliases:
-  - /operators/clustering/
+  - /operators/feature/
 ---
 <!--
 Licensed to the Apache Software Foundation (ASF) under one
diff --git a/docs/content/docs/operators/feature/onehotencoder.md b/docs/content/docs/operators/feature/onehotencoder.md
new file mode 100644
index 0000000..67aa086
--- /dev/null
+++ b/docs/content/docs/operators/feature/onehotencoder.md
@@ -0,0 +1,83 @@
+---
+title: "One Hot Encoder"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/onehotencoder.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# One Hot Encoder
+
+One-hot encoding maps a categorical feature, represented as a label index, to a
+binary vector with at most a single one-value indicating the presence of a
+specific feature value from among the set of all feature values. This encoding
+allows algorithms which expect continuous features, such as Logistic Regression,
+to use categorical features.
+
+OneHotEncoder can transform multiple columns, returning an one-hot-encoded
+output vector column for each input column.
+
+## Input Columns
+
+| Param name | Type    | Default | Description |
+| :--------- | :------ | :------ | :---------- |
+| inputCols  | Integer | `null`  | Label index |
+
+## Output Columns
+
+| Param name | Type   | Default | Description           |
+| :--------- | :----- | :------ | :-------------------- |
+| outputCols | Vector | `null`  | Encoded binary vector |
+
+## Parameters
+
+| Key           | Default                          | Type    | Required | Description                                                  |
+| ------------- | -------------------------------- | ------- | -------- | ------------------------------------------------------------ |
+| inputCols     | `null`                           | String  | yes      | Input column names.                                          |
+| outputCols    | `null`                           | String  | yes      | Output column names.                                         |
+| handleInvalid | `HasHandleInvalid.ERROR_INVALID` | String  | No       | Strategy to handle invalid entries. Supported values: `HasHandleInvalid.ERROR_INVALID`, `HasHandleInvalid.SKIP_INVALID` |
+| dropLast      | `true`                           | Boolean | no       | Whether to drop the last category.                           |
+
+## Examples
+
+```java
+import org.apache.flink.ml.feature.onehotencoder.OneHotEncoder;
+import org.apache.flink.ml.feature.onehotencoder.OneHotEncoderModel;
+
+List<Row> trainData = Arrays.asList(Row.of(0.0), Row.of(1.0), Row.of(2.0), Row.of(0.0));
+Table trainTable = tEnv.fromDataStream(env.fromCollection(trainData)).as("input");
+
+List<Row> predictData = Arrays.asList(Row.of(0.0), Row.of(1.0), Row.of(2.0));
+Table predictTable = tEnv.fromDataStream(env.fromCollection(predictData)).as("input");
+
+OneHotEncoder estimator = new OneHotEncoder().setInputCols("input").setOutputCols("output");
+OneHotEncoderModel model = estimator.fit(trainTable);
+Table outputTable = model.transform(predictTable)[0];
+
+outputTable.execute().print();
+```
+
+
+
+
+
+
+
diff --git a/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java b/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java
index 894098f..c39a774 100644
--- a/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java
+++ b/flink-ml-core/src/main/java/org/apache/flink/ml/builder/GraphModel.java
@@ -41,8 +41,8 @@ import static org.apache.flink.ml.builder.GraphNode.StageType;
 
 /**
  * A GraphModel acts as a Model. A GraphModel consists of a DAG of stages, each of which could be an
- * Estimator, Model, Transformer or AlgoOperators. When `GraphModel::transform` is called, the
- * stages are executed in a topologically-sorted order. When a stage is executed, its
+ * Estimator, Model, Transformer or AlgoOperator. When `GraphModel::transform` is called, the stages
+ * are executed in a topologically-sorted order. When a stage is executed, its
  * `AlgoOperator::transform` method will be called on the input tables (from the input edges) and
  * produce output tables to the output edges.
  */