You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/08/05 06:33:46 UTC

[incubator-doris] branch master updated: [Demo] Add stream load demo (#6344)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 216295d  [Demo] Add stream load demo (#6344)
216295d is described below

commit 216295d1b8500a02e954c2649a57f0bd8616b9aa
Author: jiafeng.zhang <zh...@gmail.com>
AuthorDate: Thu Aug 5 14:33:39 2021 +0800

    [Demo] Add stream load demo (#6344)
    
    add stream load demo
---
 .gitignore                                         |   1 +
 samples/doris-demo/pom.xml                         |  22 +++
 .../stream-load-demo/docs/instructions.md          |  74 +++++++++
 samples/doris-demo/stream-load-demo/pom.xml        |  38 +++--
 .../doris/demo/stream/DorisStreamLoader.java       | 170 +++++++++++++++++++++
 5 files changed, 296 insertions(+), 9 deletions(-)

diff --git a/.gitignore b/.gitignore
index e5b8456..5c4b0cd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -66,3 +66,4 @@ rpc_data/
 
 #ignore vscode project file
 .vscode
+samples/doris-demo/stream-load-demo/target
diff --git a/samples/doris-demo/pom.xml b/samples/doris-demo/pom.xml
index 5c6501e..b62b746 100644
--- a/samples/doris-demo/pom.xml
+++ b/samples/doris-demo/pom.xml
@@ -41,4 +41,26 @@ under the License.
     <maven.compiler.target>1.8</maven.compiler.target>
   </properties>
 
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.maven.plugins</groupId>
+      <artifactId>maven-assembly-plugin</artifactId>
+      <version>3.3.0</version>
+      <type>maven-plugin</type>
+    </dependency>
+
+    <dependency>
+      <groupId>mysql</groupId>
+      <artifactId>mysql-connector-java</artifactId>
+      <version>5.1.41</version>
+    </dependency>
+
+    <!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.5.13</version>
+    </dependency>
+  </dependencies>
+
 </project>
diff --git a/samples/doris-demo/stream-load-demo/docs/instructions.md b/samples/doris-demo/stream-load-demo/docs/instructions.md
new file mode 100644
index 0000000..6ad2c5b
--- /dev/null
+++ b/samples/doris-demo/stream-load-demo/docs/instructions.md
@@ -0,0 +1,74 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Example table structure 
+
+
+
+```sql
+CREATE TABLE `doris_test_sink` (
+  `id` int NULL COMMENT "",
+  `number` int NULL COMMENT "",
+  `price` DECIMAL(12,2) NULL COMMENT "",
+  `skuname` varchar(40) NULL COMMENT "",
+  `skudesc` varchar(200) NULL COMMENT ""
+) ENGINE=OLAP
+DUPLICATE KEY(`id`)
+COMMENT "OLAP"
+DISTRIBUTED BY HASH(`id`) BUCKETS 1
+PROPERTIES (
+"replication_num" = "3",
+"in_memory" = "false",
+"storage_format" = "V2"
+);
+```
+
+
+
+# Import text sample data
+
+```
+10001,12,13.3, test1,this is atest
+10002,100,15.3,test2,this is atest
+10003,102,16.3,test3,this is atest
+10004,120,17.3,test4,this is atest
+10005,23,10.3, test5,this is atest
+10006,24,112.3,test6,this is atest
+10007,26,13.3, test7,this is atest
+10008,29,145.3,test8,this is atest
+10009,30,16.3, test9,this is atest
+100010,32,18.3,test10,this is atest
+100011,52,18.3,test11,this is atest
+100012,62,10.3,test12,this is atest
+```
+
+
+
+# Import json sample data
+
+```json
+{    
+    "id":556393582,
+    "number":"123344",
+    "price":"23.5",
+    "skuname":"test",
+    "skudesc":"zhangfeng_test,test"
+}
+```
+
diff --git a/samples/doris-demo/stream-load-demo/pom.xml b/samples/doris-demo/stream-load-demo/pom.xml
index 2ac4e3a..a296036 100644
--- a/samples/doris-demo/stream-load-demo/pom.xml
+++ b/samples/doris-demo/stream-load-demo/pom.xml
@@ -37,13 +37,33 @@ under the License.
         <maven.compiler.target>8</maven.compiler.target>
     </properties>
 
-    <dependencies>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <version>4.11</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.3.0</version>
+                <configuration>
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <archive>
+                        <manifest>
+                            <mainClass>cmy.test.mysql.DorisStreamLoader</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/samples/doris-demo/stream-load-demo/src/main/java/org/apache/doris/demo/stream/DorisStreamLoader.java b/samples/doris-demo/stream-load-demo/src/main/java/org/apache/doris/demo/stream/DorisStreamLoader.java
new file mode 100644
index 0000000..ec2f9fb
--- /dev/null
+++ b/samples/doris-demo/stream-load-demo/src/main/java/org/apache/doris/demo/stream/DorisStreamLoader.java
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.demo.stream;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.FileEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.DefaultRedirectStrategy;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.impl.client.HttpClients;
+import org.apache.http.util.EntityUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+
+/**
+ * This example mainly demonstrates how to use stream load to import data
+ * Including file type (CSV) and data in JSON format
+ *
+ */
+public class DorisStreamLoader {
+    // FE IP Address
+    private final static String HOST = "10.220.146.10";
+    // FE port
+    private final static int PORT = 8030;
+    // db name
+    private final static String DATABASE = "test_2";
+    // table name
+    private final static String TABLE = "doris_test_sink";
+    //user name
+    private final static String USER = "root";
+    //user password
+    private final static String PASSWD = "";
+    //The path of the local file to be imported
+    private final static String LOAD_FILE_NAME = "c:/es/1.csv";
+
+    //http path of stream load task submission
+    private final static String loadUrl = String.format("http://%s:%s/api/%s/%s/_stream_load",
+            HOST, PORT, DATABASE, TABLE);
+
+    //Build http client builder
+    private final static HttpClientBuilder httpClientBuilder = HttpClients
+            .custom()
+            .setRedirectStrategy(new DefaultRedirectStrategy() {
+                @Override
+                protected boolean isRedirectable(String method) {
+                    // If the connection target is FE, you need to deal with 307 redirect。
+                    return true;
+                }
+            });
+
+    /**
+     * File import
+     * @param file
+     * @throws Exception
+     */
+    public void load(File file) throws Exception {
+        try (CloseableHttpClient client = httpClientBuilder.build()) {
+            HttpPut put = new HttpPut(loadUrl);
+            put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
+            put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
+            put.setHeader(HttpHeaders.EXPECT, "100-continue");
+            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
+
+            // You can set stream load related properties in the Header, here we set label and column_separator.
+            put.setHeader("label", UUID.randomUUID().toString());
+            put.setHeader("column_separator", ",");
+
+            // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
+            FileEntity entity = new FileEntity(file);
+            put.setEntity(entity);
+
+            try (CloseableHttpResponse response = client.execute(put)) {
+                String loadResult = "";
+                if (response.getEntity() != null) {
+                    loadResult = EntityUtils.toString(response.getEntity());
+                }
+
+                final int statusCode = response.getStatusLine().getStatusCode();
+                if (statusCode != 200) {
+                    throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
+                }
+                System.out.println("Get load result: " + loadResult);
+            }
+        }
+    }
+
+    /**
+     * JSON import
+     * @param jsonData
+     * @throws Exception
+     */
+    public void loadJson(String jsonData) throws Exception {
+        try (CloseableHttpClient client = httpClientBuilder.build()) {
+            HttpPut put = new HttpPut(loadUrl);
+            put.removeHeaders(HttpHeaders.CONTENT_LENGTH);
+            put.removeHeaders(HttpHeaders.TRANSFER_ENCODING);
+            put.setHeader(HttpHeaders.EXPECT, "100-continue");
+            put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(USER, PASSWD));
+
+            // You can set stream load related properties in the Header, here we set label and column_separator.
+            put.setHeader("label", UUID.randomUUID().toString());
+            put.setHeader("column_separator", ",");
+            put.setHeader("format", "json");
+
+            // Set up the import file. Here you can also use StringEntity to transfer arbitrary data.
+            StringEntity entity = new StringEntity(jsonData);
+            put.setEntity(entity);
+
+            try (CloseableHttpResponse response = client.execute(put)) {
+                String loadResult = "";
+                if (response.getEntity() != null) {
+                    loadResult = EntityUtils.toString(response.getEntity());
+                }
+
+                final int statusCode = response.getStatusLine().getStatusCode();
+                if (statusCode != 200) {
+                    throw new IOException(String.format("Stream load failed. status: %s load result: %s", statusCode, loadResult));
+                }
+                System.out.println("Get load result: " + loadResult);
+            }
+        }
+    }
+
+    /**
+     * Construct authentication information, the authentication method used by doris here is Basic Auth
+     * @param username
+     * @param password
+     * @return
+     */
+    private String basicAuthHeader(String username, String password) {
+        final String tobeEncode = username + ":" + password;
+        byte[] encoded = Base64.encodeBase64(tobeEncode.getBytes(StandardCharsets.UTF_8));
+        return "Basic " + new String(encoded);
+    }
+
+
+    public static void main(String[] args) throws Exception {
+        DorisStreamLoader loader = new DorisStreamLoader();
+        //file load
+        //File file = new File(LOAD_FILE_NAME);
+        //loader.load(file);
+        //json load
+        String jsonData = "{\"id\":556393582,\"number\":\"123344\",\"price\":\"23.5\",\"skuname\":\"test\",\"skudesc\":\"zhangfeng_test,test\"}";
+        loader.loadJson(jsonData);
+
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org