You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/05 06:33:46 UTC
[incubator-doris] branch master updated: [Demo] Add stream load
demo (#6344)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 216295d [Demo] Add stream load demo (#6344)
216295d is described below
commit 216295d1b8500a02e954c2649a57f0bd8616b9aa
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu Aug 5 14:33:39 2021 +0800
[Demo] Add stream load demo (#6344)
add stream load demo
---
.gitignore | 1 +
samples/doris-demo/pom.xml | 22 +++
.../stream-load-demo/docs/instructions.md | 74 +++++++++
samples/doris-demo/stream-load-demo/pom.xml | 38 +++--
.../doris/demo/stream/DorisStreamLoader.java | 170 +++++++++++++++++++++
5 files changed, 296 insertions(+), 9 deletions(-)
diff --git a/.gitignore b/.gitignore
index e5b8456..5c4b0cd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -66,3 +66,4 @@ rpc_data/
#ignore vscode project file
.vscode
+samples/doris-demo/stream-load-demo/target
diff --git a/samples/doris-demo/pom.xml b/samples/doris-demo/pom.xml
index 5c6501e..b62b746 100644
--- a/samples/doris-demo/pom.xml
+++ b/samples/doris-demo/pom.xml
@@ -41,4 +41,26 @@ under the License.
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.3.0</version>
+ <type>maven-plugin</type>
+ </dependency>
+
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ <version>5.1.41</version>
+ </dependency>
+
+ <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.5.13</version>
+ </dependency>
+ </dependencies>
+
</project>
diff --git a/samples/doris-demo/stream-load-demo/docs/instructions.md b/samples/doris-demo/stream-load-demo/docs/instructions.md
new file mode 100644
index 0000000..6ad2c5b
--- /dev/null
+++ b/samples/doris-demo/stream-load-demo/docs/instructions.md
@@ -0,0 +1,74 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Example table structure
+
+
+
+```sql
+CREATE TABLE `doris_test_sink` (
+ `id` int NULL COMMENT "",
+ `number` int NULL COMMENT "",
+ `price` DECIMAL(12,2) NULL COMMENT "",
+ `skuname` varchar(40) NULL COMMENT "",
+ `skudesc` varchar(200) NULL COMMENT ""
+) ENGINE=OLAP
+DUPLICATE KEY(`id`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "3",
+"in_memory" = "false",
+"storage_format" = "V2"
+);
+```
+
+
+
+# Import text sample data
+
+```
+10001,12,13.3, test1,this is atest
+10002,100,15.3,test2,this is atest
+10003,102,16.3,test3,this is atest
+10004,120,17.3,test4,this is atest
+10005,23,10.3, test5,this is atest
+10006,24,112.3,test6,this is atest
+10007,26,13.3, test7,this is atest
+10008,29,145.3,test8,this is atest
+10009,30,16.3, test9,this is atest
+100010,32,18.3,test10,this is atest
+100011,52,18.3,test11,this is atest
+100012,62,10.3,test12,this is atest
+```
+
+
+
+# Import json sample data
+
+```json
+{
+ "id":556393582,
+ "number":"123344",
+ "price":"23.5",
+ "skuname":"test",
+ "skudesc":"zhangfeng_test,test"
+}
+```
+
diff --git a/samples/doris-demo/stream-load-demo/pom.xml b/samples/doris-demo/stream-load-demo/pom.xml
index 2ac4e3a..a296036 100644
--- a/samples/doris-demo/stream-load-demo/pom.xml
+++ b/samples/doris-demo/stream-load-demo/pom.xml
@@ -37,13 +37,33 @@ under the License.
<maven.compiler.target>8</maven.compiler.target>
</properties>
- <dependencies>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.11</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.3.0</version>
+ <configuration>
+ <appendAssemblyId>false</appendAssemblyId>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <archive>
+ <manifest>
+ <mainClass>cmy.test.mysql.DorisStreamLoader</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/samples/doris-demo/stream-load-demo/src/main/java/org/apache/doris/demo/stream/DorisStreamLoader.java b/samples/doris-demo/stream-load-demo/src/main/java/org/apache/doris/demo/stream/DorisStreamLoader.java
new file mode 100644
index 0000000..ec2f9fb
--- /dev/null
+++ b/samples/doris-demo/stream-load-demo/src/main/java/org/apache/doris/demo/stream/DorisStreamLoader.java
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.demo.stream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+
+/**
+ * This example mainly demonstrates how to use stream load to import data
+ * Including file type (CSV) and data in JSON format
+ *
+ */
+public class DorisStreamLoader {
+ // FE IP Address
+ private final static String HOST = "10.220.146.10";
+ // FE port
+ private final static int PORT = 8030;
+ // db name
+ private final static String DATABASE = "test_2";
+ // table name
+ private final static String TABLE = "doris_test_sink";
+ //user name
+ private final static String USER = "root";
+ //user password
+ private final static String PASSWD = "";
+ //The path of the local file to be imported
+ private final static String LOAD_FILE_NAME = "c:/es/1.csv";
+
+ //http path of stream load task submission
+ private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
+ HOST, PORT, DATABASE, TABLE);
+
+ //Build http client builder
+ private final static HttpClientBuilder httpClientBuilder = HttpClients
+ .custom()
+ .setRedirectStrategy(new DefaultRedirectStrategy() {
+ @Override
+ protected boolean isRedirectable(String method) {
+ // If the connection target is FE, you need to deal with 307 redirect。
+ return true;
+ }
+ });
+
+ /**
+ * File import
+ * @param file
+ * @throws Exception
+ */
+ public void load(File file) throws Exception {
+ try (CloseableHttpClient client = httpClientBuilder.build()) {
+ HttpPut put = new HttpPut(loadUrl);
+ put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
+ put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
+ put.setHeader(HttpHeaders.EXPECT, "100-continue");
+ put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
+
+ // You can set stream load related properties in the Header, here we set label and column_separator.
+ put.setHeader("label", UUID.randomUUID().toString());
+ put.setHeader("column_separator", ",");
+
+ // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
+ FileEntity entity = new FileEntity(file);
+ put.setEntity(entity);
+
+ try (CloseableHttpResponse response = client.execute(put)) {
+ String loadResult = "";
+ if (response.getEntity() != null) {
+ loadResult = EntityUtils.toString(response.getEntity());
+ }
+
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
+ }
+ System.out.println("Get load result: " + loadResult);
+ }
+ }
+ }
+
+ /**
+ * JSON import
+ * @param jsonData
+ * @throws Exception
+ */
+ public void loadJson(String jsonData) throws Exception {
+ try (CloseableHttpClient client = httpClientBuilder.build()) {
+ HttpPut put = new HttpPut(loadUrl);
+ put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
+ put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
+ put.setHeader(HttpHeaders.EXPECT, "100-continue");
+ put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
+
+ // You can set stream load related properties in the Header, here we set label and column_separator.
+ put.setHeader("label", UUID.randomUUID().toString());
+ put.setHeader("column_separator", ",");
+ put.setHeader("format", "json");
+
+ // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
+ StringEntity entity = new StringEntity(jsonData);
+ put.setEntity(entity);
+
+ try (CloseableHttpResponse response = client.execute(put)) {
+ String loadResult = "";
+ if (response.getEntity() != null) {
+ loadResult = EntityUtils.toString(response.getEntity());
+ }
+
+ final int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode != 200) {
+ throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
+ }
+ System.out.println("Get load result: " + loadResult);
+ }
+ }
+ }
+
+ /**
+ * Construct authentication information, the authentication method used by doris here is Basic Auth
+ * @param username
+ * @param password
+ * @return
+ */
+ private String basicAuthHeader(String username, String password) {
+ final String tobeEncode = username + ":" + password;
+ byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
+ return "Basic " + new String(encoded);
+ }
+
+
+ public static void main(String[] args) throws Exception {
+ DorisStreamLoader loader = new DorisStreamLoader();
+ //file load
+ //File file = new File(LOAD_FILE_NAME);
+ //loader.load(file);
+ //json load
+ String jsonData = "{\"id\":556393582,\"number\":\"123344\",\"price\":\"23.5\",\"skuname\":\"test\",\"skudesc\":\"zhangfeng_test,test\"}";
+ loader.loadJson(jsonData);
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org