You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/08/27 02:56:05 UTC
[incubator-doris] branch master updated: Flink reads multiple data
sources to doris (#6490)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4cfebc3 Flink reads multiple data sources to doris (#6490)
4cfebc3 is described below
commit 4cfebc35a7a63dd27001f2561a966dee2b3892db
Author: caoliang-web <71...@users.noreply.github.com>
AuthorDate: Fri Aug 27 10:55:53 2021 +0800
Flink reads multiple data sources to doris (#6490)
* Flink reads multiple data sources to doris
Co-authored-by: caol <ca...@shuhaisc.com>
---
samples/doris-demo/flink-demo/pom.xml | 47 ++++
.../org/apache/doris/demo/flink/DorisSink.java | 85 +++++++
.../apache/doris/demo/flink/DorisStreamLoad.java | 163 +++++++++++++
.../org/apache/doris/demo/flink/RespContent.java | 262 +++++++++++++++++++++
.../java/org/apache/doris/demo/flink/User.java | 91 +++++++
.../doris/demo/flink/analyze/DorisSource.java | 97 ++++++++
.../flink/analyze/FlinkDorisConnectorAnalyze.java | 79 +++++++
.../flink/analyze/FlinkJdbcConnectorAnalyze.java | 70 ++++++
.../doris/demo/flink/analyze/instructions.md | 53 +++++
.../flink/hdfs/FlinkCustomHdfsSource2Doris.java | 106 +++++++++
.../demo/flink/hdfs/FlinkReadTextHdfs2Doris.java | 108 +++++++++
.../apache/doris/demo/flink/hdfs/HdfsSource.java | 88 +++++++
.../apache/doris/demo/flink/hdfs/instructions.md | 70 ++++++
.../doris/demo/flink/kafka/FlinkKafka2Doris.java | 87 +++++++
.../apache/doris/demo/flink/kafka/instructions.md | 56 +++++
15 files changed, 1462 insertions(+)
diff --git a/samples/doris-demo/flink-demo/pom.xml b/samples/doris-demo/flink-demo/pom.xml
index 808c7a0..efe2b4c 100644
--- a/samples/doris-demo/flink-demo/pom.xml
+++ b/samples/doris-demo/flink-demo/pom.xml
@@ -34,6 +34,8 @@ under the License.
<scala.binary.version>2.12</scala.binary.version>
<java.version>1.8</java.version>
<flink.version>1.12.2</flink.version>
+ <fastjson.version>1.2.62</fastjson.version>
+ <hadoop.version>2.8.3</hadoop.version>
<scope.mode>compile</scope.mode>
</properties>
@@ -73,6 +75,51 @@ under the License.
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>fastjson</artifactId>
+ <version>${fastjson.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-hadoop-compatibility_2.11</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2-uber</artifactId>
+ <version>2.6.5-10.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache</groupId>
+ <artifactId>doris-flink</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+
+ <!--After adding the following two dependencies, Flink's log will appear-->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ <version>1.7.25</version>
+ </dependency>
</dependencies>
<build>
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisSink.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisSink.java
new file mode 100644
index 0000000..911acfe
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisSink.java
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Custom doris sink
+ */
+public class DorisSink extends RichSinkFunction<String> {
+
+
+ private static final Logger log = LoggerFactory.getLogger(DorisSink.class);
+
+ private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout"));
+
+ private DorisStreamLoad dorisStreamLoad;
+
+ private String columns;
+
+ private String jsonFormat;
+
+ public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) {
+ this.dorisStreamLoad = dorisStreamLoad;
+ this.columns = columns;
+ this.jsonFormat = jsonFormat;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+
+
+ /**
+ * Determine whether StreamLoad is successful
+ *
+ * @param respContent streamLoad returns the entity
+ * @return
+ */
+ public static Boolean checkStreamLoadStatus(RespContent respContent) {
+ if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus())
+ && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
+ if (loadResponse != null && loadResponse.status == 200) {
+ RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class);
+ if (!checkStreamLoadStatus(respContent)) {
+ log.error("Stream Load fail{}:", loadResponse);
+ }
+ } else {
+ log.error("Stream Load Request failed:{}", loadResponse);
+ }
+ }
+}
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisStreamLoad.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisStreamLoad.java
new file mode 100644
index 0000000..bc43dbe
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/DorisStreamLoad.java
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.demo.flink;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import java.io.Serializable;
+import java.io.IOException;
+import java.io.BufferedOutputStream;
+import java.io.InputStream;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.Calendar;
+import java.util.UUID;
+
+
+/**
+ * doris streamLoad
+ */
+
+public class DorisStreamLoad implements Serializable {
+
+ private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class);
+
+ private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
+ private String hostPort;
+ private String db;
+ private String tbl;
+ private String user;
+ private String passwd;
+ private String loadUrlStr;
+ private String authEncoding;
+
+
+ public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) {
+ this.hostPort = hostPort;
+ this.db = db;
+ this.tbl = tbl;
+ this.user = user;
+ this.passwd = passwd;
+ this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl);
+ this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8));
+ }
+
+ private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException {
+ URL url = new URL(urlStr);
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setInstanceFollowRedirects(false);
+ conn.setRequestMethod("PUT");
+ conn.setRequestProperty("Authorization", "Basic " + authEncoding);
+ conn.addRequestProperty("Expect", "100-continue");
+ conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
+ conn.addRequestProperty("label", label);
+ conn.addRequestProperty("max_filter_ratio", "0");
+ conn.addRequestProperty("strict_mode", "true");
+ conn.addRequestProperty("columns", columns);
+ conn.addRequestProperty("format", "json");
+ conn.addRequestProperty("jsonpaths", jsonformat);
+ conn.addRequestProperty("strip_outer_array", "true");
+ conn.setDoOutput(true);
+ conn.setDoInput(true);
+
+ return conn;
+ }
+
+ public static class LoadResponse {
+ public int status;
+ public String respMsg;
+ public String respContent;
+
+ public LoadResponse(int status, String respMsg, String respContent) {
+ this.status = status;
+ this.respMsg = respMsg;
+ this.respContent = respContent;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("status: ").append(status);
+ sb.append(", resp msg: ").append(respMsg);
+ sb.append(", resp content: ").append(respContent);
+ return sb.toString();
+ }
+ }
+
+ public LoadResponse loadBatch(String data, String columns, String jsonformat) {
+ Calendar calendar = Calendar.getInstance();
+ String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
+ calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
+ calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
+ UUID.randomUUID().toString().replaceAll("-", ""));
+
+ HttpURLConnection feConn = null;
+ HttpURLConnection beConn = null;
+ try {
+ // build request and send to fe
+ feConn = getConnection(loadUrlStr, label, columns, jsonformat);
+ int status = feConn.getResponseCode();
+ // fe send back http response code TEMPORARY_REDIRECT 307 and new be location
+ if (status != 307) {
+ throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status);
+ }
+ String location = feConn.getHeaderField("Location");
+ if (location == null) {
+ throw new Exception("redirect location is null");
+ }
+ // build request and send to new be location
+ beConn = getConnection(location, label, columns, jsonformat);
+ // send data to be
+ BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
+ bos.write(data.getBytes());
+ bos.close();
+
+ // get respond
+ status = beConn.getResponseCode();
+ String respMsg = beConn.getResponseMessage();
+ InputStream stream = (InputStream) beConn.getContent();
+ BufferedReader br = new BufferedReader(new InputStreamReader(stream));
+ StringBuilder response = new StringBuilder();
+ String line;
+ while ((line = br.readLine()) != null) {
+ response.append(line);
+ }
+ return new LoadResponse(status, respMsg, response.toString());
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ String err = "failed to load audit via AuditLoader plugin with label: " + label;
+ log.warn(err, e);
+ return new LoadResponse(-1, e.getMessage(), err);
+ } finally {
+ if (feConn != null) {
+ feConn.disconnect();
+ }
+ if (beConn != null) {
+ beConn.disconnect();
+ }
+ }
+ }
+
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/RespContent.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/RespContent.java
new file mode 100644
index 0000000..1f0385a
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/RespContent.java
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink;
+
+import java.io.Serializable;
+
+/**
+ * Entity returned by streamLoad
+ */
+public class RespContent implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+
+ /**
+ * Imported transaction ID. The user may not perceive it.
+ */
+ private int TxnId;
+ /**
+ * Import Label. Specified by the user or automatically generated by the system.
+ */
+ private String Label;
+ /**
+ * Import complete status.
+ * "Success": Indicates that the import was successful.
+ * "Publish Timeout": This status also indicates that the import has been completed, but the data may be visible with a delay, and there is no need to retry.
+ * "Label Already Exists": The Label is duplicated, and the Label needs to be replaced.
+ */
+ private String Status;
+ /**
+ * The status of the import job corresponding to the existing Label.
+ * This field will only be displayed when the Status is "Label Already Exists".
+ * The user can know the status of the import job corresponding to the existing Label through this status.
+ * "RUNNING" means that the job is still executing, and "FINISHED" means that the job is successful.
+ */
+ private String ExistingJobStatus;
+ /**
+ * Import error information
+ */
+ private String Message;
+ /**
+ * Import the total number of processed rows
+ */
+ private long NumberTotalRows;
+ /**
+ * The number of rows successfully imported.
+ */
+ private long NumberLoadedRows;
+ /**
+ * Number of rows with unqualified data quality。
+ */
+ private int NumberFilteredRows;
+ /**
+ * The number of rows filtered by the where condition
+ */
+ private int NumberUnselectedRows;
+ /**
+ * Number of bytes imported。
+ */
+ private long LoadBytes;
+ /**
+ * Import completion time. The unit is milliseconds.
+ */
+ private int LoadTimeMs;
+ /**
+ * The time it takes to request Fe to start a transaction, in milliseconds
+ */
+ private int BeginTxnTimeMs;
+ /**
+ * The time it takes to request Fe to obtain the import data execution plan, in milliseconds
+ */
+ private int StreamLoadPutTimeMs;
+ /**
+ * The time spent reading data, in milliseconds
+ */
+ private int ReadDataTimeMs;
+ /**
+ * Time to perform a data write operation takes milliseconds。
+ */
+ private int WriteDataTimeMs;
+ /**
+ * The time taken to submit and publish the transaction request to Fe, in milliseconds
+ */
+ private int CommitAndPublishTimeMs;
+ /**
+ * If there is a data quality problem, check the specific error line by visiting this URL
+ */
+ private String ErrorURL;
+
+ public int getTxnId() {
+ return TxnId;
+ }
+
+ public void setTxnId(int txnId) {
+ TxnId = txnId;
+ }
+
+ public String getLabel() {
+ return Label;
+ }
+
+ public void setLabel(String label) {
+ Label = label;
+ }
+
+ public String getStatus() {
+ return Status;
+ }
+
+ public void setStatus(String status) {
+ Status = status;
+ }
+
+ public String getExistingJobStatus() {
+ return ExistingJobStatus;
+ }
+
+ public void setExistingJobStatus(String existingJobStatus) {
+ ExistingJobStatus = existingJobStatus;
+ }
+
+ public String getMessage() {
+ return Message;
+ }
+
+ public void setMessage(String message) {
+ Message = message;
+ }
+
+ public long getNumberTotalRows() {
+ return NumberTotalRows;
+ }
+
+ public void setNumberTotalRows(long numberTotalRows) {
+ NumberTotalRows = numberTotalRows;
+ }
+
+ public long getNumberLoadedRows() {
+ return NumberLoadedRows;
+ }
+
+ public void setNumberLoadedRows(long numberLoadedRows) {
+ NumberLoadedRows = numberLoadedRows;
+ }
+
+ public int getNumberFilteredRows() {
+ return NumberFilteredRows;
+ }
+
+ public void setNumberFilteredRows(int numberFilteredRows) {
+ NumberFilteredRows = numberFilteredRows;
+ }
+
+ public int getNumberUnselectedRows() {
+ return NumberUnselectedRows;
+ }
+
+ public void setNumberUnselectedRows(int numberUnselectedRows) {
+ NumberUnselectedRows = numberUnselectedRows;
+ }
+
+ public long getLoadBytes() {
+ return LoadBytes;
+ }
+
+ public void setLoadBytes(long loadBytes) {
+ LoadBytes = loadBytes;
+ }
+
+ public int getLoadTimeMs() {
+ return LoadTimeMs;
+ }
+
+ public void setLoadTimeMs(int loadTimeMs) {
+ LoadTimeMs = loadTimeMs;
+ }
+
+ public int getBeginTxnTimeMs() {
+ return BeginTxnTimeMs;
+ }
+
+ public void setBeginTxnTimeMs(int beginTxnTimeMs) {
+ BeginTxnTimeMs = beginTxnTimeMs;
+ }
+
+ public int getStreamLoadPutTimeMs() {
+ return StreamLoadPutTimeMs;
+ }
+
+ public void setStreamLoadPutTimeMs(int streamLoadPutTimeMs) {
+ StreamLoadPutTimeMs = streamLoadPutTimeMs;
+ }
+
+ public int getReadDataTimeMs() {
+ return ReadDataTimeMs;
+ }
+
+ public void setReadDataTimeMs(int readDataTimeMs) {
+ ReadDataTimeMs = readDataTimeMs;
+ }
+
+ public int getWriteDataTimeMs() {
+ return WriteDataTimeMs;
+ }
+
+ public void setWriteDataTimeMs(int writeDataTimeMs) {
+ WriteDataTimeMs = writeDataTimeMs;
+ }
+
+ public int getCommitAndPublishTimeMs() {
+ return CommitAndPublishTimeMs;
+ }
+
+ public void setCommitAndPublishTimeMs(int commitAndPublishTimeMs) {
+ CommitAndPublishTimeMs = commitAndPublishTimeMs;
+ }
+
+ public String getErrorURL() {
+ return ErrorURL;
+ }
+
+ public void setErrorURL(String errorURL) {
+ ErrorURL = errorURL;
+ }
+
+ @Override
+ public String toString() {
+ return "RespContent{" +
+ "TxnId=" + TxnId +
+ ", Label='" + Label + '\'' +
+ ", Status='" + Status + '\'' +
+ ", ExistingJobStatus='" + ExistingJobStatus + '\'' +
+ ", Message='" + Message + '\'' +
+ ", NumberTotalRows=" + NumberTotalRows +
+ ", NumberLoadedRows=" + NumberLoadedRows +
+ ", NumberFilteredRows=" + NumberFilteredRows +
+ ", NumberUnselectedRows=" + NumberUnselectedRows +
+ ", LoadBytes=" + LoadBytes +
+ ", LoadTimeMs=" + LoadTimeMs +
+ ", BeginTxnTimeMs=" + BeginTxnTimeMs +
+ ", StreamLoadPutTimeMs=" + StreamLoadPutTimeMs +
+ ", ReadDataTimeMs=" + ReadDataTimeMs +
+ ", WriteDataTimeMs=" + WriteDataTimeMs +
+ ", CommitAndPublishTimeMs=" + CommitAndPublishTimeMs +
+ ", ErrorURL='" + ErrorURL + '\'' +
+ '}';
+ }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/User.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/User.java
new file mode 100644
index 0000000..3ce8f15
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/User.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink;
+
+
+import org.apache.doris.flink.deserialization.DorisDeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.List;
+
+public class User implements DorisDeserializationSchema {
+
+ private String name;
+
+ private Integer age;
+
+ private String price;
+
+ private String sale;
+
+ public User(String name, Integer age, String price, String sale) {
+ this.name = name;
+ this.age = age;
+ this.price = price;
+ this.sale = sale;
+ }
+
+ public User() {
+ }
+
+ public void setAge(Integer age) {
+ this.age = age;
+ }
+
+ public Integer getAge() {
+ return age;
+ }
+
+ public String getPrice() {
+ return price;
+ }
+
+ public void setPrice(String price) {
+ this.price = price;
+ }
+
+ public String getSale() {
+ return sale;
+ }
+
+ public void setSale(String sale) {
+ this.sale = sale;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String toString() {
+ return "User{" +
+ "name='" + name + '\'' +
+ ", age=" + age +
+ ", price='" + price + '\'' +
+ ", sale='" + sale + '\'' +
+ '}';
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(List.class);
+ }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/DorisSource.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/DorisSource.java
new file mode 100644
index 0000000..5432d90
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/DorisSource.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.analyze;
+
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * custom flink jdbc source
+ */
+public class DorisSource extends RichSourceFunction<User> {
+ private static final Logger logger = LoggerFactory.getLogger(DorisSource.class);
+
+ private Connection connection = null;
+ private PreparedStatement ps = null;
+ private Properties properties;
+
+
+ public DorisSource(Properties properties){
+ this.properties=properties;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ Class.forName("com.mysql.jdbc.Driver");
+ connection = DriverManager.getConnection(properties.getProperty("url"),
+ properties.getProperty("username"),
+ properties.getProperty("password"));
+ ps = connection.prepareStatement(properties.getProperty("sql"));
+ }
+
+
+ @Override
+ public void run(SourceContext<User> sourceContext) throws Exception {
+ try {
+ ResultSet resultSet = ps.executeQuery();
+ while (resultSet.next()) {
+ User user = new User();
+ user.setName(resultSet.getString("name"));
+ user.setAge(resultSet.getInt("age"));
+ user.setPrice(resultSet.getString("price"));
+ user.setSale(resultSet.getString("sale"));
+ sourceContext.collect(user);
+ }
+ } catch (Exception e) {
+ logger.error("runException:{}", e);
+ } finally {
+ connection.close();
+ }
+
+ }
+
+ @Override
+ public void cancel() {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ if (ps != null) {
+ try {
+ ps.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
+
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkDorisConnectorAnalyze.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkDorisConnectorAnalyze.java
new file mode 100644
index 0000000..54589f4
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkDorisConnectorAnalyze.java
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.analyze;
+
+import org.apache.doris.demo.flink.User;
+import org.apache.doris.flink.cfg.DorisStreamOptions;
+import org.apache.doris.flink.datastream.DorisSourceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+
+/**
+ * This example mainly demonstrates the use of Flink connector to read Doris data,
+ * build DataStream for analysis, and sum.
+ * <p>
+ * use flink doris connector
+ */
+public class FlinkDorisConnectorAnalyze {
+
+ public static void main(String[] args) throws Exception {
+ Properties properties = new Properties();
+ properties.put("fenodes", "IP:8030");
+ properties.put("username", "root");
+ properties.put("password", "");
+ properties.put("table.identifier", "test1.doris_test_source_2");
+
+ DorisStreamOptions options = new DorisStreamOptions(properties);
+
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ DataStreamSource dataStreamSource = env.addSource(new DorisSourceFunction(options, new User()));
+
+ SingleOutputStreamOperator outputStream = dataStreamSource.map(new MapFunction<Object, User>() {
+ @Override
+ public User map(Object obj) throws Exception {
+ User user = new User();
+ if (obj instanceof ArrayList<?>) {
+ user.setName(((ArrayList<?>) obj).get(0).toString());
+ user.setAge(Integer.valueOf(((ArrayList<?>) obj).get(1).toString()));
+ user.setPrice(((ArrayList<?>) obj).get(2).toString());
+ user.setSale(((ArrayList<?>) obj).get(3).toString());
+ }
+ return user;
+ }
+ });
+
+ outputStream.keyBy(new KeySelector<User, String>() {
+ @Override
+ public String getKey(User user) throws Exception {
+ return user.getName();
+ }
+ })
+ .sum("age")
+ .print();
+
+ env.execute("Flink doris connector analyze test");
+ }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkJdbcConnectorAnalyze.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkJdbcConnectorAnalyze.java
new file mode 100644
index 0000000..3912f5b
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/FlinkJdbcConnectorAnalyze.java
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.analyze;
+
+
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.Properties;
+
+/**
+ * This example mainly demonstrates the use of Flink connector to read Doris data,
+ * build DataStream for analysis, and sum.
+ * <p>
+ * Use custom jdbc
+ */
+
+public class FlinkJdbcConnectorAnalyze {
+ public static void main(String[] args) throws Exception {
+ Properties properties = new Properties();
+ properties.put("url", "jdbc:mysql://IP:9030");
+ properties.put("username", "root");
+ properties.put("password", "");
+ properties.put("sql", "select * from test1.doris_test_source_2");
+
+ StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
+ executionEnvironment.setParallelism(1);
+
+ DataStreamSource<User> userDataStreamSource = executionEnvironment.addSource(new DorisSource(properties));
+
+ //group
+ KeyedStream<User, String> userObjectKeyedStream = userDataStreamSource.keyBy(new KeySelector<User, String>() {
+ @Override
+ public String getKey(User user) throws Exception {
+ return user.getName();
+ }
+ });
+ //reduce
+ SingleOutputStreamOperator<User> reduce = userObjectKeyedStream.reduce(new ReduceFunction<User>() {
+ @Override
+ public User reduce(User t0, User t1) throws Exception {
+ int sumAge = t0.getAge() + t1.getAge();
+ return new User(t0.getName(), sumAge, t0.getPrice(), t0.getSale());
+ }
+ });
+
+ reduce.print();
+
+ executionEnvironment.execute("flink custom source analyze test");
+ }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/instructions.md b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/instructions.md
new file mode 100644
index 0000000..c519502
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/analyze/instructions.md
@@ -0,0 +1,53 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Introduction
+
+This series of sample codes mainly explain how to use Flink jdbc and Flink doris connector to read data from doris from
+the perspective of Flink framework and Flink doris connector, construct a datastream, realize analysis, and give code
+examples in combination with actual usage scenarios.
+
+# Method to realize
+
+1.flink doris connector(recommend)
+
+Realized through flink doris connector
+
+**Note:** Because the Flink doris connector jar package is not in the Maven central warehouse, you need to compile it
+separately and add it to the classpath of your project. Refer to the compilation and use of Flink doris connector:
+[Flink doris connector]: https://doris.apache.org/master/zh-CN/extending-doris/flink-doris-connector.html
+
+2.flink jdbc connector
+
+First. Load doris related configuration
+
+ ```
+ properties.put("url", "jdbc:mysql://ip:9030");
+ properties.put("username", "root");
+ properties.put("password", "");
+ properties.put("sql", "select * from db.tb");
+ ```
+Second. Custom source
+
+```java
+org.apache.doris.demo.flink.analyze.DorisSource
+```
+
+
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkCustomHdfsSource2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkCustomHdfsSource2Doris.java
new file mode 100644
index 0000000..ba7562b
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkCustomHdfsSource2Doris.java
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.hdfs;
+
+
+import com.alibaba.fastjson.JSON;
+import org.apache.doris.demo.flink.DorisSink;
+import org.apache.doris.demo.flink.DorisStreamLoad;
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This example mainly demonstrates how to use flink custom hdfs source to streamly read hdfs files.
+ * And use the doris streamLoad method to write the data into the table specified by doris
+ *
+ * Support file type:txt,csv
+ */
+
+
+public class FlinkCustomHdfsSource2Doris {
+ //hdfs path
+ private static final String path = "hdfs://IP:8020/tmp/2.csv";
+ //hdfs default scheme
+ private static final String fsDefaultScheme = "hdfs://IP:8020";
+ //doris ip port
+ private static final String hostPort = "IP:8030";
+ //doris dbName
+ private static final String dbName = "test1";
+ //doris tbName
+ private static final String tbName = "doris_test_source_2";
+ //doris userName
+ private static final String userName = "root";
+ //doris password
+ private static final String password = "";
+ //doris columns
+ private static final String columns = "name,age,price,sale";
+ //json format
+ private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
+ //header
+ private final static String header = "name,age,price,sale";
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ blinkStreamEnv.enableCheckpointing(10000);
+ blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+ //Flink custom source
+ Configuration configuration = new Configuration();
+ configuration.setString("fs.default-scheme", fsDefaultScheme);
+ HdfsSource hdfsSource = new HdfsSource(configuration, path);
+ DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(hdfsSource);
+
+ //filter head
+ SingleOutputStreamOperator<String> resultData = dataStreamSource.filter(new FilterFunction<String>() {
+ @Override
+ public boolean filter(String str) throws Exception {
+ if (header.equals(str)) {
+ return false;
+ }
+ return true;
+ }
+ }).map(new MapFunction<String, String>() {
+ @Override
+ public String map(String s) throws Exception {
+ List<User> users = new ArrayList<>();
+ String[] split = s.split("\\,");
+ User user = new User();
+ user.setName(split[0]);
+ user.setAge(Integer.valueOf(split[1]));
+ user.setPrice(split[2]);
+ user.setSale(split[3]);
+ users.add(user);
+ return JSON.toJSONString(users);
+ }
+ });
+
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
+
+ resultData.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));
+
+ blinkStreamEnv.execute("flink custom hdfs source to doris");
+ }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkReadTextHdfs2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkReadTextHdfs2Doris.java
new file mode 100644
index 0000000..5a36371
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/FlinkReadTextHdfs2Doris.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.hdfs;
+
+
+import com.alibaba.fastjson.JSON;
+import org.apache.doris.demo.flink.DorisSink;
+import org.apache.doris.demo.flink.DorisStreamLoad;
+import org.apache.doris.demo.flink.User;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.List;
+
+
+/**
+ * This example mainly demonstrates how to use flink readTextFile to streamly read hdfs files.
+ * And use the doris streamLoad method to write the data into the table specified by doris
+ *
+ * Support file type:txt,csv
+ */
+public class FlinkReadTextHdfs2Doris {
+ //hdfs path
+ private static final String path = "hdfs://IP:8020/tmp/2.csv";
+ //hdfs default scheme
+ private static final String fsDefaultScheme = "hdfs://IP:8020";
+ //doris ip port
+ private static final String hostPort = "IP:8030";
+ //doris dbName
+ private static final String dbName = "test1";
+ //doris tbName
+ private static final String tbName = "doris_test_source_2";
+ //doris userName
+ private static final String userName = "root";
+ //doris password
+ private static final String password = "";
+ //doris columns
+ private static final String columns = "name,age,price,sale";
+ //json format
+ private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
+ //header
+ private final static String header ="name,age,price,sale";
+
+
+ public static void main(String[] args) throws Exception {
+ StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ blinkStreamEnv.enableCheckpointing(10000);
+ blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+ //flink readTextFile
+ DataStreamSource<String> dataStreamSource = blinkStreamEnv.readTextFile(path);
+
+ //filter head
+ SingleOutputStreamOperator<String> resultData = dataStreamSource.filter(new FilterFunction<String>() {
+ @Override
+ public boolean filter(String s) throws Exception {
+ if (header.equals(s)) {
+ System.out.println(s);
+ return false;
+ }
+ return true;
+ }
+ }).map(new MapFunction<String, String>() {
+ @Override
+ public String map(String s) throws Exception {
+ List<User> users = new ArrayList<>();
+ String[] split = s.split("\\,");
+ User user = new User();
+ user.setName(split[0]);
+ user.setAge(Integer.valueOf(split[1]));
+ user.setPrice(split[2]);
+ user.setSale(split[3]);
+ users.add(user);
+ return JSON.toJSONString(users);
+ }
+ });
+
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
+
+ resultData.addSink(new DorisSink(dorisStreamLoad, columns, jsonFormat));
+
+ resultData.print();
+
+ blinkStreamEnv.execute("flink readText hdfs file to doris");
+
+ }
+
+
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/HdfsSource.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/HdfsSource.java
new file mode 100644
index 0000000..d376b92
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/HdfsSource.java
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.hdfs;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+
+/**
+ * Custom hdfs source
+ */
+public class HdfsSource extends RichSourceFunction<String> {
+
+ // Flink HDFS FileSystem
+ private Configuration configuration;
+ // hdfs path
+ private String path;
+
+ public HdfsSource(Configuration configuration, String path) {
+ this.configuration = configuration;
+ this.path = path;
+ }
+
+
+ @Override
+ public void run(SourceContext<String> sourceContext) throws Exception {
+ FileSystem.initialize(configuration, null);
+ FileSystem fileSystem = FileSystem.get(FileSystem.getDefaultFsUri());
+ readHdfsFile(sourceContext, fileSystem);
+
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ public void readHdfsFile(SourceContext<String> sourceContext, FileSystem fs) {
+ FSDataInputStream dataInputStream = null;
+ BufferedReader bufferedReader = null;
+ try {
+ dataInputStream = fs.open(new Path(path));
+ bufferedReader = new BufferedReader(new InputStreamReader(dataInputStream));
+ String line;
+ while ((line = bufferedReader.readLine()) != null) {
+ sourceContext.collect(line);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("read hdfs file fail");
+ } finally {
+ if (bufferedReader != null) {
+ try {
+ bufferedReader.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ if (dataInputStream != null) {
+ try {
+ dataInputStream.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/instructions.md b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/instructions.md
new file mode 100644
index 0000000..e8a62b1
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/hdfs/instructions.md
@@ -0,0 +1,70 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Introduction
+
+This series of sample codes mainly explain how to use Flink to read HDFS files (including txt, csv) from the perspective
+of the Flink framework, and write them to Doris through streamLoad(Doris method), and give code examples.
+
+# Method to realize
+
+1.flink readTextFile(recommend)
+
+```java
+org.apache.doris.demo.flink.hdfs.FlinkReadTextHdfs2Doris
+```
+
+Sample program
+
+```
+DataStreamSource<String> dataStreamSource = blinkStreamEnv.readTextFile("hdfs://xxx:8020/test/txt");
+...
+```
+
+2.flink Custom hdfs source Implementation code
+
+```java
+org.apache.doris.demo.flink.hdfs.HdfsSource
+```
+
+Custom doris sink
+
+```java
+org.apache.doris.demo.flink.DorisSink
+```
+
+Sample program
+
+```
+ public class DorisSink extends RichSinkFunction<String> {
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
+ .....
+ }
+}
+```
+
+
+
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/FlinkKafka2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/FlinkKafka2Doris.java
new file mode 100644
index 0000000..9adf0dd
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/FlinkKafka2Doris.java
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.demo.flink.kafka;
+
+
+import org.apache.doris.demo.flink.DorisSink;
+import org.apache.doris.demo.flink.DorisStreamLoad;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+import java.util.Properties;
+
+/**
+ *
+ * This example mainly demonstrates how to use flink to stream Kafka data.
+ * And use the doris streamLoad method to write the data into the table specified by doris
+ * <p>
+ * Kafka data format is an array, For example: ["id":1,"name":"root"]
+ */
+
+public class FlinkKafka2Doris {
+ //kafka address
+ private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092";
+ //kafka groupName
+ private static final String groupName = "test_flink_doris_group";
+ //kafka topicName
+ private static final String topicName = "test_flink_doris";
+ //doris ip port
+ private static final String hostPort = "xxx:8030";
+ //doris dbName
+ private static final String dbName = "test1";
+ //doris tbName
+ private static final String tbName = "doris_test_source_2";
+ //doris userName
+ private static final String userName = "root";
+ //doris password
+ private static final String password = "";
+ //doris columns
+ private static final String columns = "name,age,price,sale";
+ //json format
+ private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]";
+
+ public static void main(String[] args) throws Exception {
+
+ Properties props = new Properties();
+ props.put("bootstrap.servers", bootstrapServer);
+ props.put("group.id", groupName);
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("auto.offset.reset", "earliest");
+ props.put("max.poll.records", "10000");
+
+ StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ blinkStreamEnv.enableCheckpointing(10000);
+ blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+ FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName,
+ new SimpleStringSchema(),
+ props);
+
+ DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer);
+
+ DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password);
+
+ dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat));
+
+ blinkStreamEnv.execute("flink kafka to doris");
+
+ }
+}
diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/instructions.md b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/instructions.md
new file mode 100644
index 0000000..2dba5d7
--- /dev/null
+++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/kafka/instructions.md
@@ -0,0 +1,56 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Introduction
+
+This series of sample codes mainly explain how to use the Flink connector to read Kafka data from the perspective of the
+Flink framework, and use the StreamLoad method to store doris (DorisSink method), and give code examples.
+
+# Code example
+
+```
+org.apache.doris.demo.flink.kafka.FlinkKafka2Doris
+```
+
+Custom doris sink
+
+```java
+org.apache.doris.demo.flink.DorisSink
+```
+
+Sample program
+
+```
+ public class DorisSink extends RichSinkFunction<String> {
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ }
+ @Override
+ public void invoke(String value, Context context) throws Exception {
+ DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat);
+ .....
+ }
+}
+```
+
+
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org