You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/08/27 02:57:18 UTC

[incubator-doris] branch master updated: Doris-spark connector examples (#6485)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ace21eb  Doris-spark connector examples (#6485)
ace21eb is described below

commit ace21ebf83bb646f047574563e9bec1fbc9fc47f
Author: 月眸 <11...@qq.com>
AuthorDate: Fri Aug 27 10:57:11 2021 +0800

    Doris-spark connector examples (#6485)
    
    * doris spark connector examples
    
    * add usage documentation and license
    
    Co-authored-by: shengy <whyMy2017>
---
 samples/doris-demo/spark-demo/docs/instructions.md |  74 +++++++++++++
 samples/doris-demo/spark-demo/pom.xml              |  37 +++++++
 .../spark/DorisSparkConnectionExampleJava.java     |  99 ++++++++++++++++++
 .../spark/DorisSparkConnectionExampleScala.scala   | 116 +++++++++++++++++++++
 4 files changed, 326 insertions(+)

diff --git a/samples/doris-demo/spark-demo/docs/instructions.md b/samples/doris-demo/spark-demo/docs/instructions.md
new file mode 100644
index 0000000..0fda208
--- /dev/null
+++ b/samples/doris-demo/spark-demo/docs/instructions.md
@@ -0,0 +1,74 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Instructions for use
+
+This series of sample codes mainly shows how to use spark and spark doris connector to read data from doris table.
+here give some code examples using java and scala language.
+
+```java
+org.apache.doris.demo.spark.DorisSparkConnectionExampleJava
+org.apache.doris.demo.spark.DorisSparkConnectionExampleScala
+```
+
+**Note:** Because the Spark doris connector jar file is not in the Maven central repository, you need to compile it separately and add to the classpath of your project. Refer to the compilation and use of Spark doris connector: 
+
+[Spark doris connector](https://doris.apache.org/master/zh-CN/extending-doris/spark-doris-connector.html)
+
+
+1. First, create a table in doris with any mysql client
+
+   ```sql
+   CREATE TABLE `example_table` (
+   `id` bigint(20) NOT NULL COMMENT "ID",
+   `name` varchar(100) NOT NULL COMMENT "Name",
+   `age` int(11) NOT NULL COMMENT "Age"
+   ) ENGINE = OLAP
+   UNIQUE KEY(`id`)
+   COMMENT "example table"
+   DISTRIBUTED BY HASH(`id`) BUCKETS 1
+   PROPERTIES (
+   "replication_num" = "1",
+   "in_memory" = "false",
+   "storage_format" = "V2"
+   );
+   ```
+
+2. Insert some test data to example_table
+
+   ```sql
+   insert into example_table values(1,"xx1",21);
+   insert into example_table values(2,"xx2",21);
+   ```
+
+3. Set doris config in this class
+
+   change the Doris DORIS_DB, DORIS_TABLE, DORIS_FE_IP, DORIS_FE_HTTP_PORT,
+   DORIS_FE_QUERY_PORT, DORIS_USER, DORIS_PASSWORD config in this class
+
+4. Run this class, you should see the output:
+
+   ```shell
+    +---+----+---+
+    | id|name|age|
+    +---+----+---+
+    |  1|  xx1| 21|
+    |  2|  xx2| 21|
+    +---+----+---+
+   ```
diff --git a/samples/doris-demo/spark-demo/pom.xml b/samples/doris-demo/spark-demo/pom.xml
index 6190b12..049b97e 100644
--- a/samples/doris-demo/spark-demo/pom.xml
+++ b/samples/doris-demo/spark-demo/pom.xml
@@ -32,6 +32,43 @@ under the License.
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
+        <spark.version>2.3.4</spark.version>
+        <scala.version>2.11</scala.version>
+        <scala-library.version>2.11.12</scala-library.version>
     </properties>
 
+    <dependencies>
+        <!-- doris spark -->
+        <dependency>
+            <groupId>org.apache</groupId>
+            <artifactId>doris-spark</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+        </dependency>
+
+        <!-- spark -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.version}</artifactId>
+            <version>${spark.version}</version>
+        </dependency>
+
+        <!-- scala -->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala-library.version}</version>
+        </dependency>
+
+        <!-- mysql connector -->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>8.0.23</version>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/samples/doris-demo/spark-demo/src/main/java/org/apache/doris/demo/spark/DorisSparkConnectionExampleJava.java b/samples/doris-demo/spark-demo/src/main/java/org/apache/doris/demo/spark/DorisSparkConnectionExampleJava.java
new file mode 100644
index 0000000..53f0b62
--- /dev/null
+++ b/samples/doris-demo/spark-demo/src/main/java/org/apache/doris/demo/spark/DorisSparkConnectionExampleJava.java
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.demo.spark;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * This class is a java demo for doris spark connector,
+ * and provides three ways to read doris table using spark.
+ * before you run this class, you need to build doris-spark,
+ * and put the doris-spark jar file in your maven repository
+ */
+public class DorisSparkConnectionExampleJava {
+
+    private static final String DORIS_DB = "demo";
+
+    private static final String DORIS_TABLE = "example_table";
+
+    private static final String DORIS_FE_IP = "your doris fe ip";
+
+    private static final String DORIS_FE_HTTP_PORT = "8030";
+
+    private static final String DORIS_FE_QUERY_PORT = "9030";
+
+    private static final String DORIS_USER = "your doris user";
+
+    private static final String DORIS_PASSWORD = "your doris password";
+
+    public static void main(String[] args) {
+        SparkSession sc = SparkSession.builder().master("local[*]").appName("test").getOrCreate();
+        readWithDataFrame(sc);
+        //readWithSparkSql(sc);
+        //readWithJdbc(sc);
+    }
+
+    /**
+     * read doris table Using DataFrame
+     *
+     * @param sc SparkSession
+     */
+    private static void readWithDataFrame(SparkSession sc) {
+        Dataset<Row> df = sc.read().format("doris")
+                .option("doris.table.identifier", String.format("%s.%s", DORIS_DB, DORIS_TABLE))
+                .option("doris.fenodes", String.format("%s:%s", DORIS_FE_IP, DORIS_FE_HTTP_PORT))
+                .option("user", DORIS_USER)
+                .option("password", DORIS_PASSWORD)
+                .load();
+        df.show(5);
+    }
+
+    /**
+     * read doris table Using Spark Sql
+     *
+     * @param sc SparkSession
+     */
+    private static void readWithSparkSql(SparkSession sc) {
+        sc.sql("CREATE TEMPORARY VIEW spark_doris " +
+                "USING doris " +
+                "OPTIONS( " +
+                "  \"table.identifier\"=\"" + DORIS_DB + "." + DORIS_TABLE + "\", " +
+                "  \"fenodes\"=\"" + DORIS_FE_IP + ":" + DORIS_FE_HTTP_PORT + "\", " +
+                "  \"user\"=\"" + DORIS_USER + "\", " +
+                "  \"password\"=\"" + DORIS_PASSWORD + "\" " +
+                ")");
+        sc.sql("select * from spark_doris").show(5);
+    }
+
+    /**
+     * read doris table Using jdbc
+     *
+     * @param sc SparkSession
+     */
+    private static void readWithJdbc(SparkSession sc) {
+        Dataset<Row> df = sc.read().format("jdbc")
+                .option("url", String.format("jdbc:mysql://%s:%s/%s?useUnicode=true&characterEncoding=utf-8", DORIS_FE_IP, DORIS_FE_QUERY_PORT, DORIS_DB))
+                .option("dbtable", DORIS_TABLE)
+                .option("user", DORIS_USER)
+                .option("password", DORIS_PASSWORD)
+                .load();
+        df.show(5);
+    }
+}
\ No newline at end of file
diff --git a/samples/doris-demo/spark-demo/src/main/scala/org/apache/doris/demo/spark/DorisSparkConnectionExampleScala.scala b/samples/doris-demo/spark-demo/src/main/scala/org/apache/doris/demo/spark/DorisSparkConnectionExampleScala.scala
new file mode 100644
index 0000000..e0c944d
--- /dev/null
+++ b/samples/doris-demo/spark-demo/src/main/scala/org/apache/doris/demo/spark/DorisSparkConnectionExampleScala.scala
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.demo.spark
+
+/**
+ * This class is a scala demo for doris spark connector,
+ * and provides four ways to read doris tables using spark.
+ * before you run this class, you need to build doris-spark,
+ * and put the doris-spark jar file in your maven repository
+ */
+
+object DorisSparkConnectionExampleScala {
+
+    val DORIS_DB = "demo"
+
+    val DORIS_TABLE = "example_table"
+
+    val DORIS_FE_IP = "your doris fe ip"
+
+    val DORIS_FE_HTTP_PORT = "8030"
+
+    val DORIS_FE_QUERY_PORT = "9030"
+
+    val DORIS_USER = "your doris user"
+
+    val DORIS_PASSWORD = "your doris password"
+
+    def main(args: Array[String]): Unit = {
+        val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
+        // if you want to run readWithRdd(sparkConf), please comment this line
+        // val sc = SparkSession.builder().config(sparkConf).getOrCreate()
+        readWithRdd(sparkConf)
+        // readWithDataFrame(sc)
+        // readWithSql(sc)
+        // readWithJdbc(sc)
+    }
+
+    /**
+     * read doris table Using Spark Rdd
+     */
+    def readWithRdd(sparkConf: SparkConf): Unit = {
+        val scf = new SparkContextFunctions(new SparkContext(sparkConf))
+        val rdd = scf.dorisRDD(
+            tableIdentifier = Some(s"$DORIS_DB.$DORIS_TABLE"),
+            cfg = Some(Map(
+                "doris.fenodes" -> s"$DORIS_FE_IP:$DORIS_FE_HTTP_PORT",
+                "doris.request.auth.user" -> DORIS_USER,
+                "doris.request.auth.password" -> DORIS_PASSWORD
+            ))
+        )
+        val resultArr = rdd.collect()
+        println(resultArr.mkString)
+    }
+
+    /**
+     * read doris table Using DataFrame
+     *
+     * @param sc SparkSession
+     */
+    def readWithDataFrame(sc: SparkSession): Unit = {
+        val df = sc.read.format("doris")
+            .option("doris.table.identifier", s"$DORIS_DB.$DORIS_TABLE")
+            .option("doris.fenodes", s"$DORIS_FE_IP:$DORIS_FE_HTTP_PORT")
+            .option("user", DORIS_USER)
+            .option("password", DORIS_PASSWORD)
+            .load()
+        df.show(5)
+    }
+
+    /**
+     * read doris table Using Spark Sql
+     *
+     * @param sc SparkSession
+     */
+    def readWithSql(sc: SparkSession): Unit = {
+        sc.sql("CREATE TEMPORARY VIEW spark_doris\n" +
+            "USING doris " +
+            "OPTIONS( " +
+            "  \"table.identifier\"=\"" + DORIS_DB + "." + DORIS_TABLE + "\", " +
+            "  \"fenodes\"=\"" + DORIS_FE_IP + ":" + DORIS_FE_HTTP_PORT + "\", " +
+            "  \"user\"=\"" + DORIS_USER + "\", " +
+            "  \"password\"=\"" + DORIS_PASSWORD + "\" " +
+            ")")
+        sc.sql("select * from spark_doris").show(5)
+    }
+
+    /**
+     * read doris table Using jdbc
+     *
+     * @param sc SparkSession
+     */
+    def readWithJdbc(sc: SparkSession): Unit = {
+        val df = sc.read.format("jdbc")
+            .option("url", s"jdbc:mysql://$DORIS_FE_IP:$DORIS_FE_QUERY_PORT/$DORIS_DB?useUnicode=true&characterEncoding=utf-8")
+            .option("dbtable", DORIS_TABLE)
+            .option("user", DORIS_USER)
+            .option("password", DORIS_PASSWORD)
+            .load()
+        df.show(5)
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org