You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/11/15 01:23:22 UTC

[GitHub] [flink-ml] yunfengzhou-hub commented on a diff in pull request #174: [FLINK-29604] Add Estimator and Transformer for CountVectorizer

yunfengzhou-hub commented on code in PR #174:
URL: https://github.com/apache/flink-ml/pull/174#discussion_r1021271301


##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizerModelParams.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.ml.common.param.HasInputCol;
+import org.apache.flink.ml.common.param.HasOutputCol;
+import org.apache.flink.ml.param.BooleanParam;
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params for {@link CountVectorizerModel}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface CountVectorizerModelParams<T> extends HasInputCol<T>, HasOutputCol<T> {
+    Param<Double> MIN_TF =
+            new DoubleParam(
+                    "minTF",
+                    "Filter to ignore rare words in a document. For each document,"
+                            + "terms with frequency/count less than the given threshold are ignored. "
+                            + "If this is an integer >= 1, then this specifies a count (of times "
+                            + "the term must appear in the document); if this is a double in [0,1), "
+                            + "then this specifies a fraction (out of the document's token count).",
+                    1.0,
+                    ParamValidators.gtEq(0.0));
+
+    Param<Boolean> BINARY =
+            new BooleanParam(
+                    "binary",
+                    "Binary toggle to control the output vector values. If True, all nonzero "
+                            + "counts (after minTF filter applied) are set to 1.0. This is useful for discrete "
+                            + "probabilistic models that model binary events rather than integer counts.",

Review Comment:
   Would it be better to just write "If True, all non zero counts are set to 1." as the description for this parameter?
   
   Detailed explanations like "It is useful ..." might be placed as the JavaDoc for this parameter. You may refer to the JavaDoc of other parameters like `BucketizerParams.SPLITS_ARRAY`.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizerParams.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.ml.param.DoubleParam;
+import org.apache.flink.ml.param.IntParam;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.param.ParamValidators;
+
+/**
+ * Params of {@link CountVectorizer}.
+ *
+ * @param <T> The class type of this instance.
+ */
+public interface CountVectorizerParams<T> extends CountVectorizerModelParams<T> {
+    Param<Integer> VOCABULARY_SIZE =
+            new IntParam(
+                    "vocabularySize",
+                    "Max size of the vocabulary. CountVectorizer will build a vocabulary"

Review Comment:
   We may use a simpler sentence as the description and put detailed explanations in the JavaDoc. Same for other parameters.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizerModelData.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Model data of {@link CountVectorizerModel}.
+ *
+ * <p>This class also provides methods to convert model data from Table to a data stream, and
+ * classes to save/load model data.
+ */
+public class CountVectorizerModelData {
+
+    public Map<String, Integer> vocabulary;

Review Comment:
   It might be better to still save a `String[]` in the model data, and convert it to a `Map<String, Integer>` in `CountVectorizerModel`. This better keeps this optimization as an internal implementation.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizerModelData.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.MapSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Map;
+
+/**
+ * Model data of {@link CountVectorizerModel}.
+ *
+ * <p>This class also provides methods to convert model data from Table to a data stream, and
+ * classes to save/load model data.
+ */
+public class CountVectorizerModelData {
+
+    public Map<String, Integer> vocabulary;

Review Comment:
   It might be better to add a brief description of the variables in the model data.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizerModel.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A Model which transforms data using the model data computed by {@link CountVectorizer}. */
+public class CountVectorizerModel
+        implements Model<CountVectorizerModel>, CountVectorizerModelParams<CountVectorizerModel> {
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+    private Table modelDataTable;
+
+    public CountVectorizerModel() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public CountVectorizerModel setModelData(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        modelDataTable = inputs[0];
+        return this;
+    }
+
+    @Override
+    public Table[] getModelData() {
+        return new Table[] {modelDataTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+        ReadWriteUtils.saveModelData(
+                CountVectorizerModelData.getModelDataStream(modelDataTable),
+                path,
+                new CountVectorizerModelData.ModelDataEncoder());
+    }
+
+    public static CountVectorizerModel load(StreamTableEnvironment tEnv, String path)
+            throws IOException {
+        CountVectorizerModel model = ReadWriteUtils.loadStageParam(path);
+        Table modelDataTable =
+                ReadWriteUtils.loadModelData(
+                        tEnv, path, new CountVectorizerModelData.ModelDataDecoder());
+        return model.setModelData(modelDataTable);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<Row> dataStream = tEnv.toDataStream(inputs[0]);
+        DataStream<CountVectorizerModelData> countVectorizerModel =
+                CountVectorizerModelData.getModelDataStream(modelDataTable);
+
+        final String broadcastModelKey = "broadcastModelKey";
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(
+                                inputTypeInfo.getFieldTypes(), SparseVectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+
+        DataStream<Row> output =
+                BroadcastUtils.withBroadcastStream(
+                        Collections.singletonList(dataStream),
+                        Collections.singletonMap(broadcastModelKey, countVectorizerModel),
+                        inputList -> {
+                            DataStream input = inputList.get(0);
+                            return input.map(
+                                    new PredictOutputFunction(
+                                            getInputCol(),
+                                            broadcastModelKey,
+                                            getMinTF(),
+                                            getBinary()),
+                                    outputTypeInfo);
+                        });
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    /** This operator loads model data and predicts result. */
+    private static class PredictOutputFunction extends RichMapFunction<Row, Row> {
+
+        private final String inputCol;
+        private final String broadcastKey;
+        private final double minTf;

Review Comment:
   `minTF` might be better.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizerModel.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.ml.api.Model;
+import org.apache.flink.ml.common.broadcast.BroadcastUtils;
+import org.apache.flink.ml.common.datastream.TableUtils;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.linalg.typeinfo.SparseVectorTypeInfo;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** A Model which transforms data using the model data computed by {@link CountVectorizer}. */
+public class CountVectorizerModel
+        implements Model<CountVectorizerModel>, CountVectorizerModelParams<CountVectorizerModel> {
+
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+    private Table modelDataTable;
+
+    public CountVectorizerModel() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public CountVectorizerModel setModelData(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        modelDataTable = inputs[0];
+        return this;
+    }
+
+    @Override
+    public Table[] getModelData() {
+        return new Table[] {modelDataTable};
+    }
+
+    @Override
+    public void save(String path) throws IOException {
+        ReadWriteUtils.saveMetadata(this, path);
+        ReadWriteUtils.saveModelData(
+                CountVectorizerModelData.getModelDataStream(modelDataTable),
+                path,
+                new CountVectorizerModelData.ModelDataEncoder());
+    }
+
+    public static CountVectorizerModel load(StreamTableEnvironment tEnv, String path)
+            throws IOException {
+        CountVectorizerModel model = ReadWriteUtils.loadStageParam(path);
+        Table modelDataTable =
+                ReadWriteUtils.loadModelData(
+                        tEnv, path, new CountVectorizerModelData.ModelDataDecoder());
+        return model.setModelData(modelDataTable);
+    }
+
+    @Override
+    public Map<Param<?>, Object> getParamMap() {
+        return paramMap;
+    }
+
+    @Override
+    public Table[] transform(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<Row> dataStream = tEnv.toDataStream(inputs[0]);
+        DataStream<CountVectorizerModelData> countVectorizerModel =
+                CountVectorizerModelData.getModelDataStream(modelDataTable);
+
+        final String broadcastModelKey = "broadcastModelKey";
+        RowTypeInfo inputTypeInfo = TableUtils.getRowTypeInfo(inputs[0].getResolvedSchema());
+        RowTypeInfo outputTypeInfo =
+                new RowTypeInfo(
+                        ArrayUtils.addAll(
+                                inputTypeInfo.getFieldTypes(), SparseVectorTypeInfo.INSTANCE),
+                        ArrayUtils.addAll(inputTypeInfo.getFieldNames(), getOutputCol()));
+
+        DataStream<Row> output =
+                BroadcastUtils.withBroadcastStream(
+                        Collections.singletonList(dataStream),
+                        Collections.singletonMap(broadcastModelKey, countVectorizerModel),
+                        inputList -> {
+                            DataStream input = inputList.get(0);
+                            return input.map(
+                                    new PredictOutputFunction(
+                                            getInputCol(),
+                                            broadcastModelKey,
+                                            getMinTF(),
+                                            getBinary()),
+                                    outputTypeInfo);
+                        });
+        return new Table[] {tEnv.fromDataStream(output)};
+    }
+
+    /** This operator loads model data and predicts result. */
+    private static class PredictOutputFunction extends RichMapFunction<Row, Row> {
+
+        private final String inputCol;
+        private final String broadcastKey;
+        private final double minTf;
+        private final boolean binary;
+        private Map<String, Integer> vocabulary;
+
+        public PredictOutputFunction(
+                String inputCol, String broadcastKey, double minTf, boolean binary) {
+            this.inputCol = inputCol;
+            this.broadcastKey = broadcastKey;
+            this.minTf = minTf;
+            this.binary = binary;
+        }
+
+        @Override
+        public Row map(Row row) throws Exception {
+            if (vocabulary == null) {
+                CountVectorizerModelData modelData =
+                        (CountVectorizerModelData)
+                                getRuntimeContext().getBroadcastVariable(broadcastKey).get(0);
+                vocabulary = modelData.vocabulary;
+            }
+
+            String[] document = (String[]) row.getField(inputCol);
+            double[] termCounts = new double[vocabulary.size()];
+            for (String word : document) {
+                if (vocabulary.containsKey(word)) {
+                    termCounts[vocabulary.get(word)] += 1;
+                }
+            }
+
+            double actualMinTF = minTf >= 1.0 ? minTf : document.length * minTf;

Review Comment:
   It might be better to declare this variable as an integer.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/CountVectorizerTest.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
+import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests {@link CountVectorizer} and {@link CountVectorizerModel}. */
+public class CountVectorizerTest extends AbstractTestBase {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private Table trainDataTable;
+
+    private static final double EPS = 1.0e-5;
+    private static final List<Row> TRAIN_DATA =
+            new ArrayList<>(
+                    Arrays.asList(
+                            Row.of((Object) new String[] {"a", "c", "b", "c"}),
+                            Row.of((Object) new String[] {"c", "d", "e"}),
+                            Row.of((Object) new String[] {"a", "b", "c"}),
+                            Row.of((Object) new String[] {"e", "f"}),
+                            Row.of((Object) new String[] {"a", "c", "a"})));
+
+    private static final List<SparseVector> EXPECTED_OUTPUT =
+            new ArrayList<>(
+                    Arrays.asList(
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 1, 2).toArray(),
+                                    DoubleStream.of(2.0, 1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 3, 4).toArray(),
+                                    DoubleStream.of(1.0, 1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 1, 2).toArray(),
+                                    DoubleStream.of(1.0, 1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(3, 5).toArray(),
+                                    DoubleStream.of(1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 1).toArray(),
+                                    DoubleStream.of(1.0, 2.0).toArray())));
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+
+        trainDataTable = tEnv.fromDataStream(env.fromCollection(TRAIN_DATA)).as("input");
+    }
+
+    private static void verifyPredictionResult(
+            Table output, String outputCol, List<SparseVector> expected) throws Exception {
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) output).getTableEnvironment();
+        DataStream<SparseVector> stream =
+                tEnv.toDataStream(output)
+                        .map(
+                                (MapFunction<Row, SparseVector>)
+                                        row -> (SparseVector) row.getField(outputCol));
+        List<SparseVector> result = IteratorUtils.toList(stream.executeAndCollect());
+        compareResultCollections(expected, result, TestUtils::compare);
+    }
+
+    @Test
+    public void testParam() {
+        CountVectorizer countVectorizer = new CountVectorizer();
+        assertEquals("input", countVectorizer.getInputCol());
+        assertEquals("output", countVectorizer.getOutputCol());
+        assertEquals((double) Long.MAX_VALUE, countVectorizer.getMaxDF(), EPS);
+        assertEquals(1.0, countVectorizer.getMinDF(), EPS);
+        assertEquals(1.0, countVectorizer.getMinTF(), EPS);
+        assertEquals(1 << 18, countVectorizer.getVocabularySize());
+        assertFalse(countVectorizer.getBinary());
+
+        countVectorizer
+                .setInputCol("test_input")
+                .setOutputCol("test_output")
+                .setMinDF(0.1)
+                .setMaxDF(0.9)
+                .setMinTF(10)
+                .setVocabularySize(1000)
+                .setBinary(true);
+        assertEquals("test_input", countVectorizer.getInputCol());
+        assertEquals("test_output", countVectorizer.getOutputCol());
+        assertEquals(0.9, countVectorizer.getMaxDF(), EPS);
+        assertEquals(0.1, countVectorizer.getMinDF(), EPS);
+        assertEquals(10, countVectorizer.getMinTF(), EPS);
+        assertEquals(1000, countVectorizer.getVocabularySize());
+        assertTrue(countVectorizer.getBinary());
+    }
+
+    @Test
+    public void testInvalidParam() {

Review Comment:
   This test case only tests `minDF` and `maxDF`, not generally all parameters. We can modify the name of this test case to reflect this, or add tests for other possibly invalid parameters.



##########
flink-ml-lib/src/test/java/org/apache/flink/ml/feature/CountVectorizerTest.java:
##########
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.ml.feature.countvectorizer.CountVectorizer;
+import org.apache.flink.ml.feature.countvectorizer.CountVectorizerModel;
+import org.apache.flink.ml.linalg.SparseVector;
+import org.apache.flink.ml.linalg.Vectors;
+import org.apache.flink.ml.util.TestUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.DoubleStream;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests {@link CountVectorizer} and {@link CountVectorizerModel}. */
+public class CountVectorizerTest extends AbstractTestBase {
+    @Rule public final TemporaryFolder tempFolder = new TemporaryFolder();
+    private StreamExecutionEnvironment env;
+    private StreamTableEnvironment tEnv;
+    private Table trainDataTable;
+
+    private static final double EPS = 1.0e-5;
+    private static final List<Row> TRAIN_DATA =
+            new ArrayList<>(
+                    Arrays.asList(
+                            Row.of((Object) new String[] {"a", "c", "b", "c"}),
+                            Row.of((Object) new String[] {"c", "d", "e"}),
+                            Row.of((Object) new String[] {"a", "b", "c"}),
+                            Row.of((Object) new String[] {"e", "f"}),
+                            Row.of((Object) new String[] {"a", "c", "a"})));
+
+    private static final List<SparseVector> EXPECTED_OUTPUT =
+            new ArrayList<>(
+                    Arrays.asList(
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 1, 2).toArray(),
+                                    DoubleStream.of(2.0, 1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 3, 4).toArray(),
+                                    DoubleStream.of(1.0, 1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 1, 2).toArray(),
+                                    DoubleStream.of(1.0, 1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(3, 5).toArray(),
+                                    DoubleStream.of(1.0, 1.0).toArray()),
+                            Vectors.sparse(
+                                    6,
+                                    IntStream.of(0, 1).toArray(),
+                                    DoubleStream.of(1.0, 2.0).toArray())));
+
+    @Before
+    public void before() {
+        Configuration config = new Configuration();
+        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true);
+        env = StreamExecutionEnvironment.getExecutionEnvironment(config);
+        env.setParallelism(4);
+        env.enableCheckpointing(100);
+        env.setRestartStrategy(RestartStrategies.noRestart());
+        tEnv = StreamTableEnvironment.create(env);
+
+        trainDataTable = tEnv.fromDataStream(env.fromCollection(TRAIN_DATA)).as("input");
+    }
+
+    private static void verifyPredictionResult(
+            Table output, String outputCol, List<SparseVector> expected) throws Exception {
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) output).getTableEnvironment();
+        DataStream<SparseVector> stream =
+                tEnv.toDataStream(output)
+                        .map(
+                                (MapFunction<Row, SparseVector>)
+                                        row -> (SparseVector) row.getField(outputCol));
+        List<SparseVector> result = IteratorUtils.toList(stream.executeAndCollect());
+        compareResultCollections(expected, result, TestUtils::compare);
+    }
+
+    @Test
+    public void testParam() {
+        CountVectorizer countVectorizer = new CountVectorizer();
+        assertEquals("input", countVectorizer.getInputCol());
+        assertEquals("output", countVectorizer.getOutputCol());
+        assertEquals((double) Long.MAX_VALUE, countVectorizer.getMaxDF(), EPS);
+        assertEquals(1.0, countVectorizer.getMinDF(), EPS);
+        assertEquals(1.0, countVectorizer.getMinTF(), EPS);
+        assertEquals(1 << 18, countVectorizer.getVocabularySize());
+        assertFalse(countVectorizer.getBinary());
+
+        countVectorizer
+                .setInputCol("test_input")
+                .setOutputCol("test_output")
+                .setMinDF(0.1)
+                .setMaxDF(0.9)
+                .setMinTF(10)
+                .setVocabularySize(1000)
+                .setBinary(true);
+        assertEquals("test_input", countVectorizer.getInputCol());
+        assertEquals("test_output", countVectorizer.getOutputCol());
+        assertEquals(0.9, countVectorizer.getMaxDF(), EPS);
+        assertEquals(0.1, countVectorizer.getMinDF(), EPS);
+        assertEquals(10, countVectorizer.getMinTF(), EPS);
+        assertEquals(1000, countVectorizer.getVocabularySize());
+        assertTrue(countVectorizer.getBinary());
+    }
+
+    @Test
+    public void testInvalidParam() {
+        String errMessage = "maxDF must be >= minDF.";
+        CountVectorizer countVectorizer = new CountVectorizer();
+        countVectorizer.setMaxDF(0.1);
+        countVectorizer.setMinDF(0.2);
+        try {
+            countVectorizer.fit(trainDataTable);
+            fail();
+        } catch (Throwable e) {
+            assertEquals(errMessage, e.getMessage());
+        }
+        countVectorizer.setMaxDF(1);
+        countVectorizer.setMinDF(2);
+        try {
+            countVectorizer.fit(trainDataTable);
+            fail();
+        } catch (Throwable e) {
+            assertEquals(errMessage, e.getMessage());
+        }
+        countVectorizer.setMaxDF(1);
+        countVectorizer.setMinDF(0.9);
+        try {
+            CountVectorizerModel model = countVectorizer.fit(trainDataTable);
+            Table output = model.transform(trainDataTable)[0];
+            output.execute().print();
+            fail();
+        } catch (Throwable e) {
+            assertEquals(errMessage, ExceptionUtils.getRootCause(e).getMessage());
+        }
+        countVectorizer.setMaxDF(0.1);
+        countVectorizer.setMinDF(10);
+        try {
+            CountVectorizerModel model = countVectorizer.fit(trainDataTable);
+            Table output = model.transform(trainDataTable)[0];
+            output.execute().print();
+            fail();
+        } catch (Throwable e) {
+            assertEquals(errMessage, ExceptionUtils.getRootCause(e).getMessage());
+        }
+    }
+
+    @Test
+    public void testOutputSchema() {
+        CountVectorizer countVectorizer = new CountVectorizer();
+        CountVectorizerModel model = countVectorizer.fit(trainDataTable);
+        Table output = model.transform(trainDataTable)[0];
+        assertEquals(Arrays.asList("input", "output"), output.getResolvedSchema().getColumnNames());

Review Comment:
   Let's make the column names different from default values.



##########
flink-ml-lib/src/main/java/org/apache/flink/ml/feature/countvectorizer/CountVectorizer.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.feature.countvectorizer;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.ml.api.Estimator;
+import org.apache.flink.ml.common.datastream.DataStreamUtils;
+import org.apache.flink.ml.param.Param;
+import org.apache.flink.ml.util.ParamUtils;
+import org.apache.flink.ml.util.ReadWriteUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.internal.TableImpl;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * {@link CountVectorizer} aims to help convert a collection of text documents to vectors of token
+ * counts. When an a-priori dictionary is not available, {@link CountVectorizer} can be used as an
+ * estimator to extract the vocabulary, and generates a {@link CountVectorizerModel}. The model
+ * produces sparse representations for the documents over the vocabulary, which can then be passed
+ * to other algorithms like LDA.
+ */
+public class CountVectorizer
+        implements Estimator<CountVectorizer, CountVectorizerModel>,
+                CountVectorizerParams<CountVectorizer> {
+    private final Map<Param<?>, Object> paramMap = new HashMap<>();
+
+    public CountVectorizer() {
+        ParamUtils.initializeMapWithDefaultValues(paramMap, this);
+    }
+
+    @Override
+    public CountVectorizerModel fit(Table... inputs) {
+        Preconditions.checkArgument(inputs.length == 1);
+        double minDF = getMinDF();
+        double maxDF = getMaxDF();
+        if (minDF >= 1.0 && maxDF >= 1.0 || minDF < 1.0 && maxDF < 1.0) {
+            Preconditions.checkArgument(maxDF >= minDF, "maxDF must be >= minDF.");
+        }
+
+        String inputCol = getInputCol();
+        StreamTableEnvironment tEnv =
+                (StreamTableEnvironment) ((TableImpl) inputs[0]).getTableEnvironment();
+        DataStream<String[]> inputData =
+                tEnv.toDataStream(inputs[0])
+                        .map(
+                                (MapFunction<Row, String[]>)
+                                        value -> ((String[]) value.getField(inputCol)));
+
+        DataStream<CountVectorizerModelData> modelData =
+                DataStreamUtils.aggregate(
+                        inputData,
+                        new VocabularyAggregator(getMinDF(), getMaxDF(), getVocabularySize()));
+
+        CountVectorizerModel model =
+                new CountVectorizerModel().setModelData(tEnv.fromDataStream(modelData));
+        ReadWriteUtils.updateExistingParams(model, getParamMap());
+        return model;
+    }
+
+    /**
+     * Extracts a vocabulary from input document collections and builds the {@link
+     * CountVectorizerModelData}.
+     */
+    private static class VocabularyAggregator
+            implements AggregateFunction<
+                    String[],
+                    Tuple2<Long, Map<String, Tuple2<Long, Long>>>,
+                    CountVectorizerModelData> {
+        private final double minDF;
+        private final double maxDF;
+        private final int vocabularySize;
+
+        public VocabularyAggregator(double minDF, double maxDF, int vocabularySize) {
+            this.minDF = minDF;
+            this.maxDF = maxDF;
+            this.vocabularySize = vocabularySize;
+        }
+
+        @Override
+        public Tuple2<Long, Map<String, Tuple2<Long, Long>>> createAccumulator() {
+            return Tuple2.of(0L, new HashMap<>());
+        }
+
+        @Override
+        public Tuple2<Long, Map<String, Tuple2<Long, Long>>> add(
+                String[] terms, Tuple2<Long, Map<String, Tuple2<Long, Long>>> vocabAccumulator) {
+            Map<String, Long> wc = new HashMap<>();
+            Arrays.stream(terms)
+                    .forEach(
+                            x -> {
+                                if (wc.containsKey(x)) {
+                                    wc.put(x, wc.get(x) + 1);
+                                } else {
+                                    wc.put(x, 1L);
+                                }
+                            });
+
+            Map<String, Tuple2<Long, Long>> counts = vocabAccumulator.f1;
+            wc.forEach(
+                    (w, c) -> {
+                        if (counts.containsKey(w)) {
+                            counts.get(w).f0 += c;
+                            counts.get(w).f1 += 1;
+                        } else {
+                            counts.put(w, Tuple2.of(c, 1L));
+                        }
+                    });
+            vocabAccumulator.f0 += 1;
+
+            return vocabAccumulator;
+        }
+
+        @Override
+        public CountVectorizerModelData getResult(
+                Tuple2<Long, Map<String, Tuple2<Long, Long>>> vocabAccumulator) {
+            Preconditions.checkState(vocabAccumulator.f0 > 0, "The training set is empty.");
+
+            boolean filteringRequired =
+                    !MIN_DF.defaultValue.equals(minDF) || !MAX_DF.defaultValue.equals(maxDF);
+            if (filteringRequired) {
+                long rowNum = vocabAccumulator.f0;
+                double actualMinDF = minDF >= 1.0 ? minDF : minDF * rowNum;
+                double actualMaxDF = maxDF >= 1.0 ? maxDF : maxDF * rowNum;

Review Comment:
   It might be better to declare these two variables as integers, as they represent the number of documents.



##########
docs/content/docs/operators/feature/countvectorizer.md:
##########
@@ -0,0 +1,182 @@
+---
+title: "Count Vectorizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/countvectorizer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Count Vectorizer
+
+CountVectorizer aims to help convert a collection of text documents to
+vectors of token counts. When an a-priori dictionary is not available,
+CountVectorizer can be used as an estimator to extract the vocabulary,
+and generates a CountVectorizerModel. The model produces sparse
+representations for the documents over the vocabulary, which can then
+be passed to other algorithms like LDA.
+
+### Input Columns
+
+| Param name | Type     | Default   | Description         |
+|:-----------|:---------|:----------|:--------------------|
+| inputCol   | String[] | `"input"` | Input string array. |
+
+### Output Columns
+
+| Param name | Type         | Default    | Description             |
+|:-----------|:-------------|:-----------|:------------------------|
+| outputCol  | SparseVector | `"output"` | Vector of token counts. |
+
+### Parameters
+
+Below are the parameters required by `CountVectorizerModel`.
+
+| Key        | Default    | Type    | Required | Description                                                                                                                                                                                                                                                                                                                                     |
+|------------|------------|---------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| inputCol   | `"input"`  | String  | no       | Input column name.                                                                                                                                                                                                                                                                                                                              |
+| outputCol  | `"output"` | String  | no       | Output column name.                                                                                                                                                                                                                                                                                                                             |
+| minTF      | `1.0`      | Double  | no       | Filter to ignore rare words in a document. For each document, terms with frequency/count less than the given threshold are ignored. If this is an integer >= 1, then this specifies a count (of times the term must appear in the document); if this is a double in [0,1), then this specifies a fraction (out of the document's token count).  |
+| binary     | `false`    | Boolean | no       | Binary toggle to control the output vector values. If True, all nonzero counts (after minTF filter applied) are set to 1.0. This is useful for discrete probabilistic models that model binary events rather than integer counts.                                                                                                               |
+
+`CountVectorizer` needs parameters above and also below.
+
+| Key            | Default    | Type     | Required | Description                                                                                                                                                                                                                                                                                                                                                                                  |
+|:---------------|:-----------|:---------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| vocabularySize | `262144`   | Integer  | no       | Max size of the vocabulary. CountVectorizer will build a vocabulary that only considers the top vocabulary size terms ordered by term frequency across the corpus.                                                                                                                                                                                                                           |
+| minDF          | `1.0`      | Double   | no       | Specifies the minimum number of different documents a term must appear in to be included in the vocabulary. If this is an integer >= 1, this specifies the number of documents the term must appear in; if this is a double in [0,1), then this specifies the fraction of documents.                                                                                                         |
+| maxDF          | `2^65 - 1` | Double   | no       | Specifies the maximum number of different documents a term could appear in to be included in the vocabulary. A term that appears more than the threshold will be ignored. If this is an integer >= 1, this specifies the maximum number of documents the term could appear in; if this is a double in [0,1), then this specifies the maximum fraction of documents the term could appear in. |

Review Comment:
   2^63 - 1.



##########
docs/content/docs/operators/feature/countvectorizer.md:
##########
@@ -0,0 +1,182 @@
+---
+title: "Count Vectorizer"
+weight: 1
+type: docs
+aliases:
+- /operators/feature/countvectorizer.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions dand limitations
+under the License.
+-->
+
+## Count Vectorizer
+
+CountVectorizer aims to help convert a collection of text documents to
+vectors of token counts. When an a-priori dictionary is not available,
+CountVectorizer can be used as an estimator to extract the vocabulary,
+and generates a CountVectorizerModel. The model produces sparse
+representations for the documents over the vocabulary, which can then
+be passed to other algorithms like LDA.
+
+### Input Columns
+
+| Param name | Type     | Default   | Description         |
+|:-----------|:---------|:----------|:--------------------|
+| inputCol   | String[] | `"input"` | Input string array. |
+
+### Output Columns
+
+| Param name | Type         | Default    | Description             |
+|:-----------|:-------------|:-----------|:------------------------|
+| outputCol  | SparseVector | `"output"` | Vector of token counts. |
+
+### Parameters
+
+Below are the parameters required by `CountVectorizerModel`.
+
+| Key        | Default    | Type    | Required | Description                                                                                                                                                                                                                                                                                                                                     |
+|------------|------------|---------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| inputCol   | `"input"`  | String  | no       | Input column name.                                                                                                                                                                                                                                                                                                                              |
+| outputCol  | `"output"` | String  | no       | Output column name.                                                                                                                                                                                                                                                                                                                             |
+| minTF      | `1.0`      | Double  | no       | Filter to ignore rare words in a document. For each document, terms with frequency/count less than the given threshold are ignored. If this is an integer >= 1, then this specifies a count (of times the term must appear in the document); if this is a double in [0,1), then this specifies a fraction (out of the document's token count).  |
+| binary     | `false`    | Boolean | no       | Binary toggle to control the output vector values. If True, all nonzero counts (after minTF filter applied) are set to 1.0. This is useful for discrete probabilistic models that model binary events rather than integer counts.                                                                                                               |
+
+`CountVectorizer` needs parameters above and also below.
+
+| Key            | Default    | Type     | Required | Description                                                                                                                                                                                                                                                                                                                                                                                  |
+|:---------------|:-----------|:---------|:---------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| vocabularySize | `262144`   | Integer  | no       | Max size of the vocabulary. CountVectorizer will build a vocabulary that only considers the top vocabulary size terms ordered by term frequency across the corpus.                                                                                                                                                                                                                           |

Review Comment:
   Would it be better to keep consistent with `maxDF`, and write `2^18` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org