You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/04/08 12:46:18 UTC

[GitHub] [incubator-doris] EmmyMiao87 commented on a change in pull request #5375: [Feature] Flink Doris Connector (#5372)

EmmyMiao87 commented on a change in pull request #5375:
URL: https://github.com/apache/incubator-doris/pull/5375#discussion_r609624979



##########
File path: docs/en/extending-doris/flink-doris-connector.md
##########
@@ -0,0 +1,129 @@
+---
+{
+    "title": "Flink Doris Connector",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Doris Connector
+
+Flink Doris Connector can support reading data stored in Doris through Flink.
+
+- You can map the `Doris` table to` DataStream` or `Table`.
+
+## Version Compatibility
+
+| Connector | Flink | Doris  | Java | Scala |
+| --------- | ----- | ------ | ---- | ----- |
+| 1.0.0     | 1.11.2   | 0.14.7  | 8    | 2.12  |
+
+
+## Build and Install
+
+Execute following command in dir `extension/flink-doris-connector/`:
+
+```bash
+sh build.sh
+```
+
+After successful compilation, the file `doris-flink-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Flink` to use `Flink-Doris-Connector`. For example, `Flink` running in `Local` mode, put this file in the `jars/` folder. `Flink` running in `Yarn` cluster mode, put this file in the pre-deployment package.
+
+## Example

Review comment:
       ```suggestion
   ## How to use
   ```

##########
File path: docs/zh-CN/extending-doris/flink-doris-connector.md
##########
@@ -0,0 +1,133 @@
+---
+{
+    "title": "Flink Doris Connector",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Doris Connector
+
+Flink Doris Connector 可以支持通过 Flink 读取 Doris 中存储的数据。
+
+- 可以将`Doris`表映射为`DataStream`或者`Table`。
+
+## 版本兼容
+
+| Connector | Flink | Doris  | Java | Scala |
+| --------- | ----- | ------ | ---- | ----- |
+| 1.0.0     | 1.11.2   | 0.14.7  | 8    | 2.12  |
+
+
+## 编译与安装
+
+在 `extension/flink-doris-connector/` 源码目录下执行:
+
+```bash
+sh build.sh
+```
+
+编译成功后,会在 `output/` 目录下生成文件 `doris-flink-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Flink` 的 `ClassPath` 中即可使用 `Flink-Doris-Connector`。例如,`Local` 模式运行的 `Flink`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Flink`,则将此文件放入预部署包中。

Review comment:
       我尝试编译了一下,编译完成后发现有两个jar包。一个叫 doris-flink-1.0-SNAPSHOT.jar, 一个叫original-doris-flink-1.0-SNAPSHOT.jar。这个 original-doris-flink-1.0-SNAPSHOT.jar 是干啥用的吗?

##########
File path: extension/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/ConfigurationOptions.java
##########
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.cfg;
+
+public interface ConfigurationOptions {
+    // doris fe node address
+    String DORIS_FENODES = "doris.fenodes";
+
+    String DORIS_DEFAULT_CLUSTER = "default_cluster";
+
+    String TABLE_IDENTIFIER = "table.identifier";
+    String DORIS_TABLE_IDENTIFIER = "doris.table.identifier";
+    String DORIS_READ_FIELD = "doris.read.field";
+    String DORIS_FILTER_QUERY = "doris.filter.query";
+    String DORIS_FILTER_QUERY_IN_MAX_COUNT = "doris.filter.query.in.max.count";
+    int DORIS_FILTER_QUERY_IN_VALUE_UPPER_LIMIT = 10000;
+
+    String DORIS_USER = "doris.user";
+    String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
+    // use password to save doris.request.auth.password
+    // reuse credentials mask method in spark ExternalCatalogUtils#maskCredentials
+    String DORIS_PASSWORD = "doris.password";

Review comment:
       Is there any difference between these two passwords and user?

##########
File path: extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.serialization;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.memory.RootAllocator;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.Types;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.thrift.TScanBatchResult;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * row batch data container.
+ */
+public class RowBatch {
+    private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
+
+    public static class Row {
+        private List<Object> cols;
+
+        Row(int colCount) {
+            this.cols = new ArrayList<>(colCount);
+        }
+
+        public  List<Object> getCols() {
+            return cols;
+        }
+
+        public void put(Object o) {
+            cols.add(o);
+        }
+    }
+
+    // offset for iterate the rowBatch
+    private int offsetInRowBatch = 0;
+    private int rowCountInOneBatch = 0;
+    private int readRowCount = 0;
+    private List<Row> rowBatch = new ArrayList<>();
+    private final ArrowStreamReader arrowStreamReader;
+    private final VectorSchemaRoot root;
+    private List<FieldVector> fieldVectors;
+    private RootAllocator rootAllocator;
+    private final Schema schema;
+
+    public List<Row> getRowBatch() {
+        return rowBatch;
+    }
+
+    public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
+        this.schema = schema;
+        this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
+        this.arrowStreamReader = new ArrowStreamReader(
+                new ByteArrayInputStream(nextResult.getRows()),
+                rootAllocator
+                );
+        this.offsetInRowBatch = 0;

Review comment:
       Although I don’t know whether it’s appropriate to put this while and exception logic in the constructor, 
   but could you please changed to a more elegant way of writing.

##########
File path: docs/en/extending-doris/flink-doris-connector.md
##########
@@ -0,0 +1,129 @@
+---
+{
+    "title": "Flink Doris Connector",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Doris Connector
+
+Flink Doris Connector can support reading data stored in Doris through Flink.
+
+- You can map the `Doris` table to` DataStream` or `Table`.
+
+## Version Compatibility
+
+| Connector | Flink | Doris  | Java | Scala |
+| --------- | ----- | ------ | ---- | ----- |
+| 1.0.0     | 1.11.2   | 0.14.7  | 8    | 2.12  |
+
+
+## Build and Install
+
+Execute following command in dir `extension/flink-doris-connector/`:
+
+```bash
+sh build.sh
+```
+
+After successful compilation, the file `doris-flink-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Flink` to use `Flink-Doris-Connector`. For example, `Flink` running in `Local` mode, put this file in the `jars/` folder. `Flink` running in `Yarn` cluster mode, put this file in the pre-deployment package.
+
+## Example
+
+### SQL
+

Review comment:
       
   ```suggestion
   The purpose of this step is to register the Doris data source on Flink. 
   This step is operated on Flink
   ```

##########
File path: extension/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkExample.java
##########
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+public class DorisSinkExample {
+
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        //streamload default delimiter: \t
+        DataStreamSource<String> source = env.fromElements("[{\"name\":\"doris\"}]\t1");
+        tEnv.createTemporaryView("doris_test",source,$("name"));
+
+//        Table wordWithCount = tEnv.sqlQuery("select name FROM doris_test  ");

Review comment:
       remove it

##########
File path: docs/zh-CN/extending-doris/flink-doris-connector.md
##########
@@ -0,0 +1,133 @@
+---
+{
+    "title": "Flink Doris Connector",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Doris Connector
+

Review comment:
       中文文档修改意见同英文

##########
File path: extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/RespContent.java
##########
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink.rest.models;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class RespContent {
+
+    @JsonProperty(value = "TxnId")
+    private int TxnId;
+
+    @JsonProperty(value = "Label")
+    private String Label;
+
+    @JsonProperty(value = "Status")
+    private String Status;
+
+    @JsonProperty(value = "ExistingJobStatus")
+    private String ExistingJobStatus;
+
+    @JsonProperty(value = "Message")
+    private String Message;
+
+    @JsonProperty(value = "NumberTotalRows")
+    private long NumberTotalRows;
+
+    @JsonProperty(value = "NumberLoadedRows")
+    private long NumberLoadedRows;
+
+    @JsonProperty(value = "NumberFilteredRows")
+    private int NumberFilteredRows;
+
+    @JsonProperty(value = "NumberUnselectedRows")
+    private int NumberUnselectedRows;
+
+    @JsonProperty(value = "LoadBytes")
+    private long LoadBytes;
+
+    @JsonProperty(value = "LoadTimeMs")
+    private int LoadTimeMs;
+
+    @JsonProperty(value = "BeginTxnTimeMs")
+    private int BeginTxnTimeMs;
+
+    @JsonProperty(value = "StreamLoadPutTimeMs")
+    private int StreamLoadPutTimeMs;
+
+    @JsonProperty(value = "ReadDataTimeMs")
+    private int ReadDataTimeMs;
+
+    @JsonProperty(value = "WriteDataTimeMs")
+    private int WriteDataTimeMs;
+
+    @JsonProperty(value = "CommitAndPublishTimeMs")
+    private int CommitAndPublishTimeMs;
+
+    @JsonProperty(value = "ErrorURL")
+    private String ErrorURL;
+
+    public String getStatus() {
+        return Status;
+    }
+
+    public String getMessage() {
+        return Message;
+    }
+
+    @Override
+    public String toString() {
+        return "RespContent{" +

Review comment:
       Json tostring is better?

##########
File path: extension/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java
##########
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.backend;
+
+import org.apache.doris.flink.cfg.ConfigurationOptions;
+import org.apache.doris.flink.cfg.Settings;
+import org.apache.doris.flink.exception.ConnectedFailedException;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.exception.DorisInternalException;
+import org.apache.doris.flink.serialization.Routing;
+import org.apache.doris.flink.util.ErrorMessages;
+import org.apache.doris.thrift.TDorisExternalService;
+import org.apache.doris.thrift.TScanBatchResult;
+import org.apache.doris.thrift.TScanCloseParams;
+import org.apache.doris.thrift.TScanCloseResult;
+import org.apache.doris.thrift.TScanNextBatchParams;
+import org.apache.doris.thrift.TScanOpenParams;
+import org.apache.doris.thrift.TScanOpenResult;
+import org.apache.doris.thrift.TStatusCode;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Client to request Doris BE
+ */
+public class BackendClient {
+    private static Logger logger = LoggerFactory.getLogger(BackendClient.class);
+
+    private Routing routing;
+
+    private TDorisExternalService.Client client;
+    private TTransport transport;
+
+    private boolean isConnected = false;
+    private final int retries;
+    private final int socketTimeout;
+    private final int connectTimeout;
+
+    public BackendClient(Routing routing, Settings settings) throws ConnectedFailedException {
+        this.routing = routing;
+        this.connectTimeout = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS,
+                ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT);
+        this.socketTimeout = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS,
+                ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT);
+        this.retries = settings.getIntegerProperty(ConfigurationOptions.DORIS_REQUEST_RETRIES,
+                ConfigurationOptions.DORIS_REQUEST_RETRIES_DEFAULT);
+        logger.trace("connect timeout set to '{}'. socket timeout set to '{}'. retries set to '{}'.",
+                this.connectTimeout, this.socketTimeout, this.retries);
+        open();
+    }
+
+    private void open() throws ConnectedFailedException {
+        logger.debug("Open client to Doris BE '{}'.", routing);
+        TException ex = null;
+        for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
+            logger.debug("Attempt {} to connect {}.", attempt, routing);
+            TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
+            transport = new TSocket(routing.getHost(), routing.getPort(), socketTimeout, connectTimeout);
+            TProtocol protocol = factory.getProtocol(transport);
+            client = new TDorisExternalService.Client(protocol);
+            if (isConnected) {
+                logger.info("Success connect to {}.", routing);
+                return;
+            }
+            try {
+                logger.trace("Connect status before open transport to {} is '{}'.", routing, isConnected);
+                if (!transport.isOpen()) {
+                    transport.open();
+                    isConnected = true;
+                }
+            } catch (TTransportException e) {
+                logger.warn(ErrorMessages.CONNECT_FAILED_MESSAGE, routing, e);
+                ex = e;
+            }
+
+        }
+        if (!isConnected) {
+            logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
+            throw new ConnectedFailedException(routing.toString(), ex);
+        }
+    }
+
+    private void close() {
+        logger.trace("Connect status before close with '{}' is '{}'.", routing, isConnected);
+        isConnected = false;
+        if ((transport != null) && transport.isOpen()) {
+            transport.close();
+            logger.info("Closed a connection to {}.", routing);
+        }
+        if (null != client) {
+            client = null;
+        }
+    }
+
+    /**
+     * Open a scanner for reading Doris data.
+     * @param openParams thrift struct to required by request
+     * @return scan open result
+     * @throws ConnectedFailedException throw if cannot connect to Doris BE
+     */
+    public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedFailedException {
+        logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams);
+        if (!isConnected) {
+            open();
+        }
+        TException ex = null;
+        for (int attempt = 0; attempt < retries; ++attempt) {
+            logger.debug("Attempt {} to openScanner {}.", attempt, routing);
+            try {
+                TScanOpenResult result = client.open_scanner(openParams);
+                if (result == null) {
+                    logger.warn("Open scanner result from {} is null.", routing);
+                    continue;
+                }
+                if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
+                    logger.warn("The status of open scanner result from {} is '{}', error message is: {}.",
+                            routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs());
+                    continue;
+                }
+                return result;
+            } catch (TException e) {
+                logger.warn("Open scanner from {} failed.", routing, e);
+                ex = e;
+            }
+        }
+        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
+        throw new ConnectedFailedException(routing.toString(), ex);
+    }
+
+    /**
+     * get next row batch from Doris BE
+     * @param nextBatchParams thrift struct to required by request
+     * @return scan batch result
+     * @throws ConnectedFailedException throw if cannot connect to Doris BE
+     */
+    public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException {
+        logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams);
+        if (!isConnected) {
+            open();
+        }
+        TException ex = null;
+        TScanBatchResult result = null;
+        for (int attempt = 0; attempt < retries; ++attempt) {
+            logger.debug("Attempt {} to getNext {}.", attempt, routing);
+            try {
+                result  = client.get_next(nextBatchParams);
+                if (result == null) {
+                    logger.warn("GetNext result from {} is null.", routing);
+                    continue;
+                }
+                if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
+                    logger.warn("The status of get next result from {} is '{}', error message is: {}.",
+                            routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs());
+                    continue;
+                }
+                return result;
+            } catch (TException e) {
+                logger.warn("Get next from {} failed.", routing, e);
+                ex = e;
+            }
+        }
+        if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code()))) {
+            logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(),
+                    result.getStatus().getError_msgs());
+            throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(),
+                    result.getStatus().getError_msgs());
+        }
+        logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
+        throw new ConnectedFailedException(routing.toString(), ex);
+    }
+
+    /**
+     * close an scanner.
+     * @param closeParams thrift struct to required by request
+     */
+    public void closeScanner(TScanCloseParams closeParams) {
+        logger.debug("CloseScanner to '{}', parameter is '{}'.", routing, closeParams);

Review comment:
       CloseScanner from ? Close scanner from? CloseScanner to? Close scanner to ? Which is correct? 

##########
File path: docs/zh-CN/extending-doris/flink-doris-connector.md
##########
@@ -0,0 +1,133 @@
+---
+{
+    "title": "Flink Doris Connector",
+    "language": "zh-CN"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Doris Connector
+
+Flink Doris Connector 可以支持通过 Flink 读取 Doris 中存储的数据。
+
+- 可以将`Doris`表映射为`DataStream`或者`Table`。
+
+## 版本兼容
+
+| Connector | Flink | Doris  | Java | Scala |
+| --------- | ----- | ------ | ---- | ----- |
+| 1.0.0     | 1.11.2   | 0.14.7  | 8    | 2.12  |
+
+
+## 编译与安装
+
+在 `extension/flink-doris-connector/` 源码目录下执行:
+
+```bash
+sh build.sh
+```
+
+编译成功后,会在 `output/` 目录下生成文件 `doris-flink-1.0.0-SNAPSHOT.jar`。将此文件复制到 `Flink` 的 `ClassPath` 中即可使用 `Flink-Doris-Connector`。例如,`Local` 模式运行的 `Flink`,将此文件放入 `jars/` 文件夹下。`Yarn`集群模式运行的`Flink`,则将此文件放入预部署包中。
+
+## 使用示例
+
+### SQL
+
+```sql
+CREATE TABLE flink_doris_source (
+    name STRING,
+    age INT,
+    price DECIMAL(5,2),
+    sale DOUBLE
+    ) 
+    WITH (
+      'connector' = 'doris',
+      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
+      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
+      'username' = '$YOUR_DORIS_USERNAME',
+      'password' = '$YOUR_DORIS_PASSWORD'
+);
+
+CREATE TABLE flink_doris_sink (
+    name STRING,
+    age INT,
+    price DECIMAL(5,2),
+    sale DOUBLE
+    ) 
+    WITH (
+      'connector' = 'doris',
+      'fenodes' = '$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT',
+      'table.identifier' = '$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME',
+      'username' = '$YOUR_DORIS_USERNAME',
+      'password' = '$YOUR_DORIS_PASSWORD'
+);
+
+INSERT INTO flink_doris_sink select name,age,price,sale from flink_doris_source
+```
+
+### DataStream
+
+```java
+DorisOptions.Builder options = DorisOptions.builder()
+                .setFenodes("$YOUR_DORIS_FE_HOSTNAME:$YOUR_DORIS_FE_RESFUL_PORT")
+                .setUsername("$YOUR_DORIS_USERNAME")
+                .setPassword("$YOUR_DORIS_PASSWORD")
+                .setTableIdentifier("$YOUR_DORIS_DATABASE_NAME.$YOUR_DORIS_TABLE_NAME");
+env.addSource(new DorisSourceFunction<>(options.build(),new SimpleListDeserializationSchema())).print();
+```
+ 
+
+## 配置
+
+### 通用配置项
+
+| Key                              | Default Value     | Comment                                                      |
+| -------------------------------- | ----------------- | ------------------------------------------------------------ |
+| fenodes                    | --                | Doris FE http 地址             |
+| table.identifier           | --                | Doris 表名,如:db1.tbl1                                 |
+| username                            | --            | 访问Doris的用户名                                            |
+| password                        | --            | 访问Doris的密码                                              |
+| sink.batch.size     | 100                | 单次写BE的最大行数        |
+| sink.max-retries     | 1                | 写BE失败之后的重试次数       |

Review comment:
       看起来可以调整的参数好像不止这些,可以后续把参数的含义都补充一下。比如mem limit这种。

##########
File path: docs/en/extending-doris/flink-doris-connector.md
##########
@@ -0,0 +1,129 @@
+---
+{
+    "title": "Flink Doris Connector",
+    "language": "en"
+}
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Flink Doris Connector
+
+Flink Doris Connector can support reading data stored in Doris through Flink.
+
+- You can map the `Doris` table to` DataStream` or `Table`.
+
+## Version Compatibility
+
+| Connector | Flink | Doris  | Java | Scala |
+| --------- | ----- | ------ | ---- | ----- |
+| 1.0.0     | 1.11.2   | 0.14.7  | 8    | 2.12  |
+
+
+## Build and Install
+
+Execute following command in dir `extension/flink-doris-connector/`:
+
+```bash
+sh build.sh
+```
+
+After successful compilation, the file `doris-flink-1.0.0-SNAPSHOT.jar` will be generated in the `output/` directory. Copy this file to `ClassPath` in `Flink` to use `Flink-Doris-Connector`. For example, `Flink` running in `Local` mode, put this file in the `jars/` folder. `Flink` running in `Yarn` cluster mode, put this file in the pre-deployment package.
+
+## Example
+

Review comment:
       ```suggestion
   The purpose of this step is to register the Doris data source on Flink. 
   This step is operated on Flink.
   There are two ways to use sql and scala. The following are examples to illustrate
   ```
   
   

##########
File path: extension/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
##########
@@ -0,0 +1,304 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.serialization;
+
+import com.google.common.base.Preconditions;
+import org.apache.arrow.memory.RootAllocator;
+
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TinyIntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.Types;
+import org.apache.doris.flink.exception.DorisException;
+import org.apache.doris.flink.rest.models.Schema;
+import org.apache.doris.thrift.TScanBatchResult;
+
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.StringData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * row batch data container.
+ */
+public class RowBatch {
+    private static Logger logger = LoggerFactory.getLogger(RowBatch.class);
+
+    public static class Row {
+        private List<Object> cols;
+
+        Row(int colCount) {
+            this.cols = new ArrayList<>(colCount);
+        }
+
+        public  List<Object> getCols() {
+            return cols;
+        }
+
+        public void put(Object o) {
+            cols.add(o);
+        }
+    }
+
+    // offset for iterate the rowBatch
+    private int offsetInRowBatch = 0;
+    private int rowCountInOneBatch = 0;
+    private int readRowCount = 0;
+    private List<Row> rowBatch = new ArrayList<>();
+    private final ArrowStreamReader arrowStreamReader;
+    private final VectorSchemaRoot root;
+    private List<FieldVector> fieldVectors;
+    private RootAllocator rootAllocator;
+    private final Schema schema;
+
+    public List<Row> getRowBatch() {
+        return rowBatch;
+    }
+
+    public RowBatch(TScanBatchResult nextResult, Schema schema) throws DorisException {
+        this.schema = schema;
+        this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
+        this.arrowStreamReader = new ArrowStreamReader(
+                new ByteArrayInputStream(nextResult.getRows()),
+                rootAllocator
+                );
+        this.offsetInRowBatch = 0;
+        try {
+            this.root = arrowStreamReader.getVectorSchemaRoot();
+            while (arrowStreamReader.loadNextBatch()) {
+                fieldVectors = root.getFieldVectors();
+                if (fieldVectors.size() != schema.size()) {
+                    logger.error("Schema size '{}' is not equal to arrow field size '{}'.",
+                            fieldVectors.size(), schema.size());
+                    throw new DorisException("Load Doris data failed, schema size of fetch data is wrong.");
+                }
+                if (fieldVectors.size() == 0 || root.getRowCount() == 0) {
+                    logger.debug("One batch in arrow has no data.");
+                    continue;
+                }
+                rowCountInOneBatch = root.getRowCount();
+                // init the rowBatch
+                for (int i = 0; i < rowCountInOneBatch; ++i) {
+                    rowBatch.add(new Row(fieldVectors.size()));
+                }
+                convertArrowToRowBatch();
+                readRowCount += root.getRowCount();
+            }
+        } catch (Exception e) {
+            logger.error("Read Doris Data failed because: ", e);
+            throw new DorisException(e.getMessage());
+        } finally {
+            close();
+        }
+    }
+
+    public boolean hasNext() {
+        if (offsetInRowBatch < readRowCount) {
+            return true;
+        }
+        return false;
+    }
+
+    private void addValueToRow(int rowIndex, Object obj) {
+        if (rowIndex > rowCountInOneBatch) {
+            String errMsg = "Get row offset: " + rowIndex + " larger than row size: " +
+                    rowCountInOneBatch;
+            logger.error(errMsg);
+            throw new NoSuchElementException(errMsg);
+        }
+        rowBatch.get(readRowCount + rowIndex).put(obj);
+    }
+
+    public void convertArrowToRowBatch() throws DorisException {
+        try {

Review comment:
       This method is worth having a unit test ~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org