You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/08/27 02:56:05 UTC

[incubator-doris] branch master updated: Flink reads multiple data sources to doris (#6490)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cfebc3  Flink reads multiple data sources to doris (#6490)
4cfebc3 is described below

commit 4cfebc35a7a63dd27001f2561a966dee2b3892db
Author: caoliang-web <71...@users.noreply.github.com>
AuthorDate: Fri Aug 27 10:55:53 2021 +0800

    Flink reads multiple data sources to doris (#6490)
    
    * Flink reads multiple data sources to doris
    
    Co-authored-by: caol <ca...@shuhaisc.com>
---
 samples/doris-demo/flink-demo/pom.xml              |  47 ++++
 .../org/apache/doris/demo/flink/DorisSink.java     |  85 +++++++
 .../apache/doris/demo/flink/DorisStreamLoad.java   | 163 +++++++++++++
 .../org/apache/doris/demo/flink/RespContent.java   | 262 +++++++++++++++++++++
 .../java/org/apache/doris/demo/flink/User.java     |  91 +++++++
 .../doris/demo/flink/analyze/DorisSource.java      |  97 ++++++++
 .../flink/analyze/FlinkDorisConnectorAnalyze.java  |  79 +++++++
 .../flink/analyze/FlinkJdbcConnectorAnalyze.java   |  70 ++++++
 .../doris/demo/flink/analyze/instructions.md       |  53 +++++
 .../flink/hdfs/FlinkCustomHdfsSource2Doris.java    | 106 +++++++++
 .../demo/flink/hdfs/FlinkReadTextHdfs2Doris.java   | 108 +++++++++
 .../apache/doris/demo/flink/hdfs/HdfsSource.java   |  88 +++++++
 .../apache/doris/demo/flink/hdfs/instructions.md   |  70 ++++++
 .../doris/demo/flink/kafka/FlinkKafka2Doris.java   |  87 +++++++
 .../apache/doris/demo/flink/kafka/instructions.md  |  56 +++++
 15 files changed, 1462 insertions(+)

diff --git a/samples/doris-demo/flink-demo/pom.xml b/samples/doris-demo/flink-demo/pom.xml
index 808c7a0..efe2b4c 100644
--- a/samples/doris-demo/flink-demo/pom.xml
+++ b/samples/doris-demo/flink-demo/pom.xml
@@ -34,6 +34,8 @@ under the License.
         <scala.binary.version>2.12</scala.binary.version>
         <java.version>1.8</java.version>
         <flink.version>1.12.2</flink.version>
+        <fastjson.version>1.2.62</fastjson.version>
+        <hadoop.version>2.8.3</hadoop.version>
         <scope.mode>compile</scope.mode>
     </properties>
 
@@ -73,6 +75,51 @@ under the License.
             <artifactId>flink-connector-jdbc_2.11</artifactId>
             <version>${flink.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>${fastjson.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+            <version>${flink.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
+            <version>2.6.5-10.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache</groupId>
+            <artifactId>doris-flink</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+
+
+        <!--After adding the following two dependencies, Flink's log will appear-->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>1.7.25</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisSink.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisSink.java
new file mode 100644
index 0000000..911acfe
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisSink.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Custom doris sink
+ */
+public class DorisSink extends RichSinkFunction<String> {
+
+
+    private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
+
+    private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
+
+    private DorisStreamLoad dorisStreamLoad;
+
+    private String columns;
+
+    private String jsonFormat;
+
+    public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
+        this.dorisStreamLoad = dorisStreamLoad;
+        this.columns = columns;
+        this.jsonFormat = jsonFormat;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+    }
+
+
+    /**
+     * Determine whether StreamLoad is successful
+     *
+     * @param respContent streamLoad returns the entity
+     * @return
+     */
+    public static Boolean checkStreamLoadStatus(RespContent respContent) {
+        if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
+                && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public void invoke(String value, Context context) throws Exception {
+        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
+        if (loadResponse != null && loadResponse.status == 200) {
+            RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
+            if (!checkStreamLoadStatus(respContent)) {
+                log.error("Stream Load fail{}:", loadResponse);
+            }
+        } else {
+            log.error("Stream Load Request failed:{}", loadResponse);
+        }
+    }
+}
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisStreamLoad.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisStreamLoad.java
new file mode 100644
index 0000000..bc43dbe
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisStreamLoad.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.demo.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.Serializable;
+import java.io.IOException;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.UUID;
+
+
+/**
+ * doris streamLoad
+ */
+
+public class DorisStreamLoad implements Serializable {
+
+    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
+
+    private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
+    private String hostPort;
+    private String db;
+    private String tbl;
+    private String user;
+    private String passwd;
+    private String loadUrlStr;
+    private String authEncoding;
+
+
+    public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
+        this.hostPort = hostPort;
+        this.db = db;
+        this.tbl = tbl;
+        this.user = user;
+        this.passwd = passwd;
+        this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+    }
+
+    private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
+        URL url = new URL(urlStr);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+        conn.setInstanceFollowRedirects(false);
+        conn.setRequestMethod("PUT");
+        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+        conn.addRequestProperty("Expect", "100-continue");
+        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
+        conn.addRequestProperty("label", label);
+        conn.addRequestProperty("max_filter_ratio", "0");
+        conn.addRequestProperty("strict_mode", "true");
+        conn.addRequestProperty("columns", columns);
+        conn.addRequestProperty("format", "json");
+        conn.addRequestProperty("jsonpaths", jsonformat);
+        conn.addRequestProperty("strip_outer_array", "true");
+        conn.setDoOutput(true);
+        conn.setDoInput(true);
+
+        return conn;
+    }
+
+    public static class LoadResponse {
+        public int status;
+        public String respMsg;
+        public String respContent;
+
+        public LoadResponse(int status, String respMsg, String respContent) {
+            this.status = status;
+            this.respMsg = respMsg;
+            this.respContent = respContent;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("status: ").append(status);
+            sb.append(", resp msg: ").append(respMsg);
+            sb.append(", resp content: ").append(respContent);
+            return sb.toString();
+        }
+    }
+
+    public LoadResponse loadBatch(String data, String columns, String jsonformat) {
+        Calendar calendar = Calendar.getInstance();
+        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
+                calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
+                calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
+                UUID.randomUUID().toString().replaceAll("-", ""));
+
+        HttpURLConnection feConn = null;
+        HttpURLConnection beConn = null;
+        try {
+            // build request and send to fe
+            feConn = getConnection(loadUrlStr, label, columns, jsonformat);
+            int status = feConn.getResponseCode();
+            // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
+            if (status != 307) {
+                throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
+            }
+            String location = feConn.getHeaderField("Location");
+            if (location == null) {
+                throw new Exception("redirect location is null");
+            }
+            // build request and send to new be location
+            beConn = getConnection(location, label, columns, jsonformat);
+            // send data to be
+            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
+            bos.write(data.getBytes());
+            bos.close();
+
+            // get respond
+            status = beConn.getResponseCode();
+            String respMsg = beConn.getResponseMessage();
+            InputStream stream = (InputStream) beConn.getContent();
+            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
+            StringBuilder response = new StringBuilder();
+            String line;
+            while ((line = br.readLine()) != null) {
+                response.append(line);
+            }
+            return new LoadResponse(status, respMsg, response.toString());
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            String err = "failed to load audit via AuditLoader plugin with label: " + label;
+            log.warn(err, e);
+            return new LoadResponse(-1, e.getMessage(), err);
+        } finally {
+            if (feConn != null) {
+                feConn.disconnect();
+            }
+            if (beConn != null) {
+                beConn.disconnect();
+            }
+        }
+    }
+
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/RespContent.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/RespContent.java
new file mode 100644
index 0000000..1f0385a
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/RespContent.java
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink;
+
+import java.io.Serializable;
+
+/**
+ * Entity returned by streamLoad
+ */
+public class RespContent implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+
+    /**
+     * Imported transaction ID. The user may not perceive it.
+     */
+    private int TxnId;
+    /**
+     * Import Label. Specified by the user or automatically generated by the system.
+     */
+    private String Label;
+    /**
+     * Import complete status.
+     * "Success": Indicates that the import was successful.
+     * "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
+     * "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
+     */
+    private String Status;
+    /**
+     * The status of the import job corresponding to the existing Label.
+     * This field will only be displayed when the Status is "Label Already Exists".
+     * The user can know the status of the import job corresponding to the existing Label through this status.
+     * "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
+     */
+    private String ExistingJobStatus;
+    /**
+     * Import error information
+     */
+    private String Message;
+    /**
+     * Import the total number of processed rows
+     */
+    private long NumberTotalRows;
+    /**
+     * The number of rows successfully imported.
+     */
+    private long NumberLoadedRows;
+    /**
+     * Number of rows with unqualified data quality。
+     */
+    private int NumberFilteredRows;
+    /**
+     * The number of rows filtered by the where condition
+     */
+    private int NumberUnselectedRows;
+    /**
+     * Number of bytes imported。
+     */
+    private long LoadBytes;
+    /**
+     * Import completion time. The unit is milliseconds.
+     */
+    private int LoadTimeMs;
+    /**
+     * The time it takes to request Fe to start a transaction, in milliseconds
+     */
+    private int BeginTxnTimeMs;
+    /**
+     * The time it takes to request Fe to obtain the import data execution plan, in milliseconds
+     */
+    private int StreamLoadPutTimeMs;
+    /**
+     * The time spent reading data, in milliseconds
+     */
+    private int ReadDataTimeMs;
+    /**
+     * Time to perform a data write operation takes milliseconds。
+     */
+    private int WriteDataTimeMs;
+    /**
+     * The time taken to submit and publish the transaction request to Fe, in milliseconds
+     */
+    private int CommitAndPublishTimeMs;
+    /**
+     * If there is a data quality problem, check the specific error line by visiting this URL
+     */
+    private String ErrorURL;
+
+    public int getTxnId() {
+        return TxnId;
+    }
+
+    public void setTxnId(int txnId) {
+        TxnId = txnId;
+    }
+
+    public String getLabel() {
+        return Label;
+    }
+
+    public void setLabel(String label) {
+        Label = label;
+    }
+
+    public String getStatus() {
+        return Status;
+    }
+
+    public void setStatus(String status) {
+        Status = status;
+    }
+
+    public String getExistingJobStatus() {
+        return ExistingJobStatus;
+    }
+
+    public void setExistingJobStatus(String existingJobStatus) {
+        ExistingJobStatus = existingJobStatus;
+    }
+
+    public String getMessage() {
+        return Message;
+    }
+
+    public void setMessage(String message) {
+        Message = message;
+    }
+
+    public long getNumberTotalRows() {
+        return NumberTotalRows;
+    }
+
+    public void setNumberTotalRows(long numberTotalRows) {
+        NumberTotalRows = numberTotalRows;
+    }
+
+    public long getNumberLoadedRows() {
+        return NumberLoadedRows;
+    }
+
+    public void setNumberLoadedRows(long numberLoadedRows) {
+        NumberLoadedRows = numberLoadedRows;
+    }
+
+    public int getNumberFilteredRows() {
+        return NumberFilteredRows;
+    }
+
+    public void setNumberFilteredRows(int numberFilteredRows) {
+        NumberFilteredRows = numberFilteredRows;
+    }
+
+    public int getNumberUnselectedRows() {
+        return NumberUnselectedRows;
+    }
+
+    public void setNumberUnselectedRows(int numberUnselectedRows) {
+        NumberUnselectedRows = numberUnselectedRows;
+    }
+
+    public long getLoadBytes() {
+        return LoadBytes;
+    }
+
+    public void setLoadBytes(long loadBytes) {
+        LoadBytes = loadBytes;
+    }
+
+    public int getLoadTimeMs() {
+        return LoadTimeMs;
+    }
+
+    public void setLoadTimeMs(int loadTimeMs) {
+        LoadTimeMs = loadTimeMs;
+    }
+
+    public int getBeginTxnTimeMs() {
+        return BeginTxnTimeMs;
+    }
+
+    public void setBeginTxnTimeMs(int beginTxnTimeMs) {
+        BeginTxnTimeMs = beginTxnTimeMs;
+    }
+
+    public int getStreamLoadPutTimeMs() {
+        return StreamLoadPutTimeMs;
+    }
+
+    public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
+        StreamLoadPutTimeMs = streamLoadPutTimeMs;
+    }
+
+    public int getReadDataTimeMs() {
+        return ReadDataTimeMs;
+    }
+
+    public void setReadDataTimeMs(int readDataTimeMs) {
+        ReadDataTimeMs = readDataTimeMs;
+    }
+
+    public int getWriteDataTimeMs() {
+        return WriteDataTimeMs;
+    }
+
+    public void setWriteDataTimeMs(int writeDataTimeMs) {
+        WriteDataTimeMs = writeDataTimeMs;
+    }
+
+    public int getCommitAndPublishTimeMs() {
+        return CommitAndPublishTimeMs;
+    }
+
+    public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
+        CommitAndPublishTimeMs = commitAndPublishTimeMs;
+    }
+
+    public String getErrorURL() {
+        return ErrorURL;
+    }
+
+    public void setErrorURL(String errorURL) {
+        ErrorURL = errorURL;
+    }
+
+    @Override
+    public String toString() {
+        return "RespContent{" +
+                "TxnId=" + TxnId +
+                ", Label='" + Label + '\'' +
+                ", Status='" + Status + '\'' +
+                ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
+                ", Message='" + Message + '\'' +
+                ", NumberTotalRows=" + NumberTotalRows +
+                ", NumberLoadedRows=" + NumberLoadedRows +
+                ", NumberFilteredRows=" + NumberFilteredRows +
+                ", NumberUnselectedRows=" + NumberUnselectedRows +
+                ", LoadBytes=" + LoadBytes +
+                ", LoadTimeMs=" + LoadTimeMs +
+                ", BeginTxnTimeMs=" + BeginTxnTimeMs +
+                ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
+                ", ReadDataTimeMs=" + ReadDataTimeMs +
+                ", WriteDataTimeMs=" + WriteDataTimeMs +
+                ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
+                ", ErrorURL='" + ErrorURL + '\'' +
+                '}';
+    }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/User.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/User.java
new file mode 100644
index 0000000..3ce8f15
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/User.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink;
+
+
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.List;
+
+public class User implements DorisDeserializationSchema {
+
+    private String name;
+
+    private Integer age;
+
+    private String price;
+
+    private String sale;
+
+    public User(String name, Integer age, String price, String sale) {
+        this.name = name;
+        this.age = age;
+        this.price = price;
+        this.sale = sale;
+    }
+
+    public User() {
+    }
+
+    public void setAge(Integer age) {
+        this.age = age;
+    }
+
+    public Integer getAge() {
+        return age;
+    }
+
+    public String getPrice() {
+        return price;
+    }
+
+    public void setPrice(String price) {
+        this.price = price;
+    }
+
+    public String getSale() {
+        return sale;
+    }
+
+    public void setSale(String sale) {
+        this.sale = sale;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public String toString() {
+        return "User{" +
+                "name='" + name + '\'' +
+                ", age=" + age +
+                ", price='" + price + '\'' +
+                ", sale='" + sale + '\'' +
+                '}';
+    }
+
+    @Override
+    public TypeInformation getProducedType() {
+        return TypeInformation.of(List.class);
+    }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/DorisSource.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/DorisSource.java
new file mode 100644
index 0000000..5432d90
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/DorisSource.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.analyze;
+
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * custom flink jdbc source
+ */
+public class DorisSource extends RichSourceFunction<User> {
+    private static final Logger logger = LoggerFactory.getLogger(DorisSource.class);
+
+    private Connection connection = null;
+    private PreparedStatement ps = null;
+    private Properties properties;
+
+
+    public DorisSource(Properties properties){
+        this.properties=properties;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        Class.forName("com.mysql.jdbc.Driver");
+        connection = DriverManager.getConnection(properties.getProperty("url"),
+                    properties.getProperty("username"),
+                    properties.getProperty("password"));
+        ps = connection.prepareStatement(properties.getProperty("sql"));
+    }
+
+
+    @Override
+    public void run(SourceContext<User> sourceContext) throws Exception {
+        try {
+            ResultSet resultSet = ps.executeQuery();
+            while (resultSet.next()) {
+                User user = new User();
+                user.setName(resultSet.getString("name"));
+                user.setAge(resultSet.getInt("age"));
+                user.setPrice(resultSet.getString("price"));
+                user.setSale(resultSet.getString("sale"));
+                sourceContext.collect(user);
+            }
+        } catch (Exception e) {
+            logger.error("runException:{}", e);
+        } finally {
+            connection.close();
+        }
+
+    }
+
+    @Override
+    public void cancel() {
+        if (connection != null) {
+            try {
+                connection.close();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }
+        if (ps != null) {
+            try {
+                ps.close();
+            } catch (SQLException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}
+
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkDorisConnectorAnalyze.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkDorisConnectorAnalyze.java
new file mode 100644
index 0000000..54589f4
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkDorisConnectorAnalyze.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.analyze;
+
+import org.apache.doris.demo.flink.User;
+import org.apache.doris.flink.cfg.DorisStreamOptions;
+import org.apache.doris.flink.datastream.DorisSourceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+
+/**
+ * This example mainly demonstrates the use of Flink connector to read Doris data,
+ * build DataStream for analysis, and sum.
+ * <p>
+ * use flink doris connector
+ */
+public class FlinkDorisConnectorAnalyze {
+
+    public static void main(String[] args) throws Exception {
+        Properties properties = new Properties();
+        properties.put("fenodes", "IP:8030");
+        properties.put("username", "root");
+        properties.put("password", "");
+        properties.put("table.identifier", "test1.doris_test_source_2");
+
+        DorisStreamOptions options = new DorisStreamOptions(properties);
+
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(2);
+
+        DataStreamSource dataStreamSource = env.addSource(new DorisSourceFunction(options, new User()));
+
+        SingleOutputStreamOperator outputStream = dataStreamSource.map(new MapFunction<Object, User>() {
+            @Override
+            public User map(Object obj) throws Exception {
+                User user = new User();
+                if (obj instanceof ArrayList<?>) {
+                    user.setName(((ArrayList<?>) obj).get(0).toString());
+                    user.setAge(Integer.valueOf(((ArrayList<?>) obj).get(1).toString()));
+                    user.setPrice(((ArrayList<?>) obj).get(2).toString());
+                    user.setSale(((ArrayList<?>) obj).get(3).toString());
+                }
+                return user;
+            }
+        });
+
+        outputStream.keyBy(new KeySelector<User, String>() {
+            @Override
+            public String getKey(User user) throws Exception {
+                return user.getName();
+            }
+        })
+                .sum("age")
+                .print();
+
+        env.execute("Flink doris connector analyze test");
+    }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkJdbcConnectorAnalyze.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkJdbcConnectorAnalyze.java
new file mode 100644
index 0000000..3912f5b
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkJdbcConnectorAnalyze.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.analyze;
+
+
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Properties;
+
+/**
+ * This example mainly demonstrates the use of Flink connector to read Doris data,
+ * build DataStream for analysis, and sum.
+ * <p>
+ * Use custom jdbc
+ */
+
+public class FlinkJdbcConnectorAnalyze {
+    public static void main(String[] args) throws Exception {
+        Properties properties = new Properties();
+        properties.put("url", "jdbc:mysql://IP:9030");
+        properties.put("username", "root");
+        properties.put("password", "");
+        properties.put("sql", "select * from test1.doris_test_source_2");
+
+        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+        executionEnvironment.setParallelism(1);
+
+        DataStreamSource<User> userDataStreamSource = executionEnvironment.addSource(new DorisSource(properties));
+
+        //group
+        KeyedStream<User, String> userObjectKeyedStream = userDataStreamSource.keyBy(new KeySelector<User, String>() {
+            @Override
+            public String getKey(User user) throws Exception {
+                return user.getName();
+            }
+        });
+        //reduce
+        SingleOutputStreamOperator<User> reduce = userObjectKeyedStream.reduce(new ReduceFunction<User>() {
+            @Override
+            public User reduce(User t0, User t1) throws Exception {
+                int sumAge = t0.getAge() + t1.getAge();
+                return new User(t0.getName(), sumAge, t0.getPrice(), t0.getSale());
+            }
+        });
+
+        reduce.print();
+
+        executionEnvironment.execute("flink custom source analyze test");
+    }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/instructions.md b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/instructions.md
new file mode 100644
index 0000000..c519502
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/instructions.md
@@ -0,0 +1,53 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Introduction
+
+This series of sample codes mainly explain how to use Flink jdbc and Flink doris connector to read data from doris from
+the perspective of Flink framework and Flink doris connector, construct a datastream, realize analysis, and give code
+examples in combination with actual usage scenarios.
+
+# Method to realize
+
+1.flink doris connector(recommend)
+
+Realized through flink doris connector
+
+**Note:** Because the Flink doris connector jar package is not in the Maven central warehouse, you need to compile it
+separately and add it to the classpath of your project. Refer to the compilation and use of Flink doris connector:
+[Flink doris connector]: https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html
+
+2.flink jdbc connector
+
+First. Load doris related configuration
+
+  ``` 
+        properties.put("url", "jdbc:mysql://ip:9030");
+        properties.put("username", "root");
+        properties.put("password", "");
+        properties.put("sql", "select * from db.tb");
+ ```
+Second. Custom source
+
+```java
+org.apache.doris.demo.flink.analyze.DorisSource
+```
+
+
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkCustomHdfsSource2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkCustomHdfsSource2Doris.java
new file mode 100644
index 0000000..ba7562b
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkCustomHdfsSource2Doris.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.hdfs;
+
+
+import com.alibaba.fastjson.JSON;
+import org.apache.doris.demo.flink.DorisSink;
+import org.apache.doris.demo.flink.DorisStreamLoad;
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This example mainly demonstrates how to use flink custom hdfs source to streamly read hdfs files.
+ * And use the doris streamLoad method to write the data into the table specified by doris
+ *
+ * Support file type:txt,csv
+ */
+
+
+public class FlinkCustomHdfsSource2Doris {
+    //hdfs path
+    private static final String path = "hdfs://IP:8020/tmp/2.csv";
+    //hdfs default scheme
+    private static final String fsDefaultScheme = "hdfs://IP:8020";
+    //doris ip port
+    private static final String hostPort = "IP:8030";
+    //doris dbName
+    private static final String dbName = "test1";
+    //doris tbName
+    private static final String tbName = "doris_test_source_2";
+    //doris userName
+    private static final String userName = "root";
+    //doris password
+    private static final String password = "";
+    //doris columns
+    private static final String columns = "name,age,price,sale";
+    //json format
+    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
+    //header
+    private final static String header = "name,age,price,sale";
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        blinkStreamEnv.enableCheckpointing(10000);
+        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+        //Flink custom source
+        Configuration configuration = new Configuration();
+        configuration.setString("fs.default-scheme", fsDefaultScheme);
+        HdfsSource hdfsSource = new HdfsSource(configuration, path);
+        DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(hdfsSource);
+
+        //filter head
+        SingleOutputStreamOperator<String> resultData = dataStreamSource.filter(new FilterFunction<String>() {
+            @Override
+            public boolean filter(String str) throws Exception {
+                if (header.equals(str)) {
+                    return false;
+                }
+                return true;
+            }
+        }).map(new MapFunction<String, String>() {
+            @Override
+            public String map(String s) throws Exception {
+                List<User> users = new ArrayList<>();
+                String[] split = s.split("\\,");
+                User user = new User();
+                user.setName(split[0]);
+                user.setAge(Integer.valueOf(split[1]));
+                user.setPrice(split[2]);
+                user.setSale(split[3]);
+                users.add(user);
+                return JSON.toJSONString(users);
+            }
+        });
+
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
+
+        resultData.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));
+
+        blinkStreamEnv.execute("flink custom hdfs source to doris");
+    }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkReadTextHdfs2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkReadTextHdfs2Doris.java
new file mode 100644
index 0000000..5a36371
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkReadTextHdfs2Doris.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.hdfs;
+
+
+import com.alibaba.fastjson.JSON;
+import org.apache.doris.demo.flink.DorisSink;
+import org.apache.doris.demo.flink.DorisStreamLoad;
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This example mainly demonstrates how to use flink readTextFile to streamly read hdfs files.
+ * And use the doris streamLoad method to write the data into the table specified by doris
+ *
+ * Support file type:txt,csv
+ */
+public class FlinkReadTextHdfs2Doris {
+    //hdfs path
+    private static final String path = "hdfs://IP:8020/tmp/2.csv";
+    //hdfs default scheme
+    private static final String fsDefaultScheme = "hdfs://IP:8020";
+    //doris ip port
+    private static final String hostPort = "IP:8030";
+    //doris dbName
+    private static final String dbName = "test1";
+    //doris tbName
+    private static final String tbName = "doris_test_source_2";
+    //doris userName
+    private static final String userName = "root";
+    //doris password
+    private static final String password = "";
+    //doris columns
+    private static final String columns = "name,age,price,sale";
+    //json format
+    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
+    //header
+    private final static String header ="name,age,price,sale";
+
+
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        blinkStreamEnv.enableCheckpointing(10000);
+        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+        //flink readTextFile
+        DataStreamSource<String> dataStreamSource = blinkStreamEnv.readTextFile(path);
+
+        //filter head
+        SingleOutputStreamOperator<String> resultData = dataStreamSource.filter(new FilterFunction<String>() {
+            @Override
+            public boolean filter(String s) throws Exception {
+                if (header.equals(s)) {
+                    System.out.println(s);
+                    return false;
+                }
+                return true;
+            }
+        }).map(new MapFunction<String, String>() {
+            @Override
+            public String map(String s) throws Exception {
+                List<User> users = new ArrayList<>();
+                String[] split = s.split("\\,");
+                User user = new User();
+                user.setName(split[0]);
+                user.setAge(Integer.valueOf(split[1]));
+                user.setPrice(split[2]);
+                user.setSale(split[3]);
+                users.add(user);
+                return JSON.toJSONString(users);
+            }
+        });
+
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
+
+        resultData.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));
+
+        resultData.print();
+
+        blinkStreamEnv.execute("flink readText hdfs file to doris");
+
+    }
+
+
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/HdfsSource.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/HdfsSource.java
new file mode 100644
index 0000000..d376b92
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/HdfsSource.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+/**
+ * Custom hdfs source
+ */
+public class HdfsSource extends RichSourceFunction<String> {
+
+    // Flink HDFS FileSystem
+    private Configuration configuration;
+    // hdfs path
+    private String path;
+
+    public HdfsSource(Configuration configuration, String path) {
+        this.configuration = configuration;
+        this.path = path;
+    }
+
+
+    @Override
+    public void run(SourceContext<String> sourceContext) throws Exception {
+        FileSystem.initialize(configuration, null);
+        FileSystem fileSystem = FileSystem.get(FileSystem.getDefaultFsUri());
+        readHdfsFile(sourceContext, fileSystem);
+
+    }
+
+    @Override
+    public void cancel() {
+    }
+
+    public void readHdfsFile(SourceContext<String> sourceContext, FileSystem fs) {
+        FSDataInputStream dataInputStream = null;
+        BufferedReader bufferedReader = null;
+        try {
+            dataInputStream = fs.open(new Path(path));
+            bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
+            String line;
+            while ((line = bufferedReader.readLine()) != null) {
+                sourceContext.collect(line);
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("read hdfs file fail");
+        } finally {
+            if (bufferedReader != null) {
+                try {
+                    bufferedReader.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+            if (dataInputStream != null) {
+                try {
+                    dataInputStream.close();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+    }
+
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/instructions.md b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/instructions.md
new file mode 100644
index 0000000..e8a62b1
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/instructions.md
@@ -0,0 +1,70 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Introduction
+
+This series of sample codes mainly explain how to use Flink to read HDFS files (including txt, csv) from the perspective
+of the Flink framework, and write them to Doris through streamLoad(Doris method), and give code examples.
+
+# Method to realize
+
+1.flink readTextFile(recommend)
+
+```java
+org.apache.doris.demo.flink.hdfs.FlinkReadTextHdfs2Doris
+```
+
+Sample program
+
+```
+DataStreamSource<String> dataStreamSource = blinkStreamEnv.readTextFile("hdfs://xxx:8020/test/txt");
+...
+```
+
+2.flink Custom hdfs source Implementation code
+
+```java
+org.apache.doris.demo.flink.hdfs.HdfsSource
+```
+
+Custom doris sink
+
+```java
+org.apache.doris.demo.flink.DorisSink
+```
+
+Sample program
+
+```
+   public class DorisSink extends RichSinkFunction<String> {
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+    }
+    @Override
+    public void invoke(String value, Context context) throws Exception {
+        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
+        .....
+    }
+}
+```
+
+
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/FlinkKafka2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/FlinkKafka2Doris.java
new file mode 100644
index 0000000..9adf0dd
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/FlinkKafka2Doris.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.kafka;
+
+
+import org.apache.doris.demo.flink.DorisSink;
+import org.apache.doris.demo.flink.DorisStreamLoad;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.Properties;
+
+/**
+ *
+ * This example mainly demonstrates how to use flink to stream Kafka data.
+ *  And use the doris streamLoad method to write the data into the table specified by doris
+ * <p>
+ * Kafka data format is an array, For example: ["id":1,"name":"root"]
+ */
+
+public class FlinkKafka2Doris {
+    //kafka address
+    private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
+    //kafka groupName
+    private static final String groupName = "test_flink_doris_group";
+    //kafka topicName
+    private static final String topicName = "test_flink_doris";
+    //doris ip port
+    private static final String hostPort = "xxx:8030";
+    //doris dbName
+    private static final String dbName = "test1";
+    //doris tbName
+    private static final String tbName = "doris_test_source_2";
+    //doris userName
+    private static final String userName = "root";
+    //doris password
+    private static final String password = "";
+    //doris columns
+    private static final String columns = "name,age,price,sale";
+    //json format
+    private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
+
+    public static void main(String[] args) throws Exception {
+
+        Properties props = new Properties();
+        props.put("bootstrap.servers", bootstrapServer);
+        props.put("group.id", groupName);
+        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("auto.offset.reset", "earliest");
+        props.put("max.poll.records", "10000");
+
+        StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+        blinkStreamEnv.enableCheckpointing(10000);
+        blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
+                new SimpleStringSchema(),
+                props);
+
+        DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
+
+        DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
+
+        dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
+
+        blinkStreamEnv.execute("flink kafka to doris");
+
+    }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/instructions.md b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/instructions.md
new file mode 100644
index 0000000..2dba5d7
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/instructions.md
@@ -0,0 +1,56 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Introduction
+
+This series of sample codes mainly explain how to use the Flink connector to read Kafka data from the perspective of the
+Flink framework, and use the StreamLoad method to store doris (DorisSink method), and give code examples.
+
+# Code example
+
+```
+org.apache.doris.demo.flink.kafka.FlinkKafka2Doris
+```
+
+Custom doris sink
+
+```java
+org.apache.doris.demo.flink.DorisSink
+```
+
+Sample program
+
+```
+   public class DorisSink extends RichSinkFunction<String> {
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+    }
+    @Override
+    public void invoke(String value, Context context) throws Exception {
+        DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
+        .....
+    }
+}
+```
+
+
+
+

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org