You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/01/10 08:47:48 UTC

[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-2222] bugs of Spark Connector (#4678)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.12 by this push:
     new 7c1b88d  [To rel/0.12][IOTDB-2222] bugs of Spark Connector (#4678)
7c1b88d is described below

commit 7c1b88dd5f79d07fc0ce2865e4d72d5836291cc2
Author: Xuan Ronaldo <xu...@qq.com>
AuthorDate: Mon Jan 10 16:47:15 2022 +0800

    [To rel/0.12][IOTDB-2222] bugs of Spark Connector (#4678)
---
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md |  8 +--
 .../UserGuide/Ecosystem Integration/Spark IoTDB.md | 20 ++++----
 pom.xml                                            |  2 +-
 spark-iotdb-connector/pom.xml                      | 16 ++++--
 .../org/apache/iotdb/spark/db/DataFrameTools.scala | 58 ++++++++++++++++------
 .../org/apache/iotdb/spark/db/DefaultSource.scala  |  2 +-
 .../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala |  8 ++-
 .../org/apache/iotdb/spark/db/Transformer.scala    | 18 +++----
 .../org/apache/iotdb/spark/db/IoTDBWriteTest.scala |  6 ++-
 .../DataFrameToolsTest.scala}                      | 49 ++++++------------
 10 files changed, 105 insertions(+), 82 deletions(-)

diff --git a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
index 9a62e22..ae20b85 100644
--- a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md	
+++ b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md	
@@ -25,7 +25,7 @@ The versions required for Spark and Java are as follow:
 
 | Spark Version | Scala Version | Java Version | TsFile |
 | :-------------: | :-------------: | :------------: |:------------: |
-| `2.4.3`        | `2.11`        | `1.8`        | `0.12.0`|
+| `2.4.5`        | `2.12`        | `1.8`        | `0.12.0`|
 
 
 > Currently we only support spark version 2.4.3 and there are some known issue on 2.4.7, do no use it
@@ -41,7 +41,7 @@ mvn clean scala:compile compile install
     <dependency>
       <groupId>org.apache.iotdb</groupId>
       <artifactId>spark-iotdb-connector</artifactId>
-      <version>0.10.0</version>
+      <version>0.12.4</version>
     </dependency>
 ```
 
@@ -201,8 +201,10 @@ val dfWithColumn = df.withColumnRenamed("_1", "Time")
     .withColumnRenamed("_7", "root.test.d0.s5")
 dfWithColumn.write.format("org.apache.iotdb.spark.db")
     .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+    .option("numPartition", "10")
     .save
 ```
 
 ### Notes
-1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
\ No newline at end of file
+1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
+2. The parameter `numPartition` is used to set the number of partitions. The dataframe that you want to save will be repartition based on this parameter before  writing data. Each partition will open a session to write data to increase the number of concurrent requests.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
index c83f0b1..3923362 100644
--- a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md	
+++ b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md	
@@ -27,7 +27,7 @@ Spark和Java所需的版本如下:
 
 | Spark Version | Scala Version | Java Version | TsFile   |
 | ------------- | ------------- | ------------ | -------- |
-| `2.4.3`       | `2.11`        | `1.8`        | `0.12.0` |
+| `2.4.5`       | `2.12`        | `1.8`        | `0.12.0` |
 
 ### 安装
 
@@ -39,14 +39,14 @@ mvn clean scala:compile compile install
     <dependency>
       <groupId>org.apache.iotdb</groupId>
       <artifactId>spark-iotdb-connector</artifactId>
-      <version>0.10.0</version>
+      <version>0.12.4</version>
     </dependency>
 ```
 
 #### Spark-shell用户指南
 
 ```
-spark-shell --jars spark-iotdb-connector-0.12.0.jar,iotdb-jdbc-0.12.0-jar-with-dependencies.jar
+spark-shell --jars spark-iotdb-connector-0.12.4.jar,iotdb-jdbc-0.12.4-jar-with-dependencies.jar
 
 import org.apache.iotdb.spark.db._
 
@@ -88,11 +88,11 @@ TsFile中的现有数据如下:
  * d1:root.ln.wf01.wt01
  * d2:root.ln.wf02.wt02
 
-time|d1.status|time|d1.temperature |time	| d2.hardware	|time|d2.status
----- | ---- | ---- | ---- | ---- | ----  | ---- | ---- | ---- 
-1|True	|1|2.2|2|"aaa"|1|True
-3|True	|2|2.2|4|"bbb"|2|False
-5|False|3	|2.1|6	|"ccc"|4|True
+| time | d1.status | time | d1.temperature | time | d2.hardware | time | d2.status |
+| ---- | --------- | ---- | -------------- | ---- | ----------- | ---- | --------- |
+| 1    | True      | 1    | 2.2            | 2    | "aaa"       | 1    | True      |
+| 3    | True      | 2    | 2.2            | 4    | "bbb"       | 2    | False     |
+| 5    | False     | 3    | 2.1            | 6    | "ccc"       | 4    | True      |
 
 
 宽(默认)表形式如下:
@@ -205,8 +205,10 @@ val dfWithColumn = df.withColumnRenamed("_1", "Time")
     .withColumnRenamed("_7", "root.test.d0.s5")
 dfWithColumn.write.format("org.apache.iotdb.spark.db")
     .option("url", "jdbc:iotdb://127.0.0.1:6667/")
+	.option("numPartition", "10")
     .save
 ```
 
 ### 注意
-1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
\ No newline at end of file
+1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
+2. numPartition参数是用来设置分区数,会在写入数据之前给dataframe进行重分区。每一个分区都会开启一个session进行数据的写入,来提高并发数。
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6e03cf7..91e446c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,7 +118,7 @@
         <slf4j.version>1.7.12</slf4j.version>
         <logback.version>1.2.10</logback.version>
         <joda.version>2.9.9</joda.version>
-        <spark.version>2.4.3</spark.version>
+        <spark.version>2.4.5</spark.version>
         <flink.version>1.11.1</flink.version>
         <common.io.version>2.5</common.io.version>
         <commons.collections4>4.4</commons.collections4>
diff --git a/spark-iotdb-connector/pom.xml b/spark-iotdb-connector/pom.xml
index 91f31ae..dba810c 100644
--- a/spark-iotdb-connector/pom.xml
+++ b/spark-iotdb-connector/pom.xml
@@ -32,8 +32,8 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <compile.version>1.8</compile.version>
-        <scala.library.version>2.11</scala.library.version>
-        <scala.version>2.11.12</scala.version>
+        <scala.library.version>2.12.10</scala.library.version>
+        <scala.version>2.12.10</scala.version>
     </properties>
     <dependencies>
         <dependency>
@@ -60,13 +60,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-core_2.11</artifactId>
+            <artifactId>spark-core_2.12</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-sql_2.11</artifactId>
+            <artifactId>spark-sql_2.12</artifactId>
             <version>${spark.version}</version>
             <scope>provided</scope>
         </dependency>
@@ -77,10 +77,16 @@
         </dependency>
         <dependency>
             <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_2.11</artifactId>
+            <artifactId>scalatest_2.12</artifactId>
+            <version>3.0.2</version>
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.6.7</version>
+        </dependency>
+        <dependency>
             <groupId>net.jpountz.lz4</groupId>
             <artifactId>lz4</artifactId>
             <version>1.3.0</version>
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala
index adc08b6..a6f00a2 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala
@@ -22,25 +22,20 @@ package org.apache.iotdb.spark.db
 import org.apache.iotdb.session.Session
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.{BOOLEAN, DOUBLE, FLOAT, INT32, INT64, TEXT}
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
 
 import java.util
 import java.lang
 
 object DataFrameTools {
   def insertDataFrame(options: IoTDBOptions, dataframe: DataFrame): Unit = {
-    val filteredColumns = Array[String]("Time", "device_name")
+    val filteredColumns = Array[String]("Time", "Device")
     val sensorTypes = dataframe.dtypes.filter(x => !filteredColumns.contains(x._1))
 
-    val devices = dataframe
-      .select("device_name")
-      .distinct()
-      .collect()
-      .map(x => x.get(0))
-
     dataframe
-      .repartition(devices.length, dataframe.col("device_name"))
-      .foreachPartition { partition =>
+      .repartition(options.numPartition.toInt)
+      .sortWithinPartitions(dataframe.col("Device"))
+      .foreachPartition { (partition: Iterator[Row]) =>
         val hostPort = options.url.split("//")(1).replace("/", "").split(":")
         val session = new Session(
           hostPort(0),
@@ -50,13 +45,22 @@ object DataFrameTools {
         )
         session.open()
 
+        var device: lang.String = ""
         val times = new util.ArrayList[lang.Long]()
         val measurementsList = new util.ArrayList[util.List[lang.String]]()
         val typesList = new util.ArrayList[util.List[TSDataType]]()
         val valuesList = new util.ArrayList[util.List[Object]]()
-        var device: lang.String = ""
+
+        val batchSize = 1000
+        var currentSize = 0
+
         partition.foreach { record =>
           if ("".equals(device)) device = record.get(1).toString
+          else if (!device.equals(record.get(1).toString)) {
+            insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
+            device = record.get(1).toString
+            currentSize = 0
+          }
           val measurements = new util.ArrayList[lang.String]()
           val types = new util.ArrayList[TSDataType]()
           val values = new util.ArrayList[Object]()
@@ -67,13 +71,22 @@ object DataFrameTools {
             measurements.add(sensorTypes(i - 2)._1)
             types.add(getType(sensorTypes(i - 2)._2))
           }
-          times.add(record.get(0).asInstanceOf[Long])
-          measurementsList.add(measurements)
-          typesList.add(types)
-          valuesList.add(values)
+          if (!values.isEmpty) {
+            times.add(record.get(0).asInstanceOf[Long])
+            measurementsList.add(measurements)
+            typesList.add(types)
+            valuesList.add(values)
+            currentSize += 1
+          }
+          if (currentSize >= batchSize) {
+            insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
+            currentSize = 0
+          }
         }
 
-        session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList)
+        if (!valuesList.isEmpty) {
+          insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
+        }
         session.close()
       }
 
@@ -102,4 +115,17 @@ object DataFrameTools {
       case _ => null
     }
   }
+
+  def insertAndEmptyDataSet(session: Session,
+                            device: lang.String,
+                            times: util.ArrayList[lang.Long],
+                            measurementsList: util.ArrayList[util.List[lang.String]],
+                            typesList: util.ArrayList[util.List[TSDataType]],
+                            valuesList: util.ArrayList[util.List[Object]]): Unit = {
+    session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList)
+    times.clear()
+    measurementsList.clear()
+    typesList.clear()
+    valuesList.clear()
+  }
 }
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
index d5238d9..3174918 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
@@ -48,7 +48,7 @@ private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegis
     }
     val iotdbOptions = new IoTDBOptions(parameters)
 
-    if (!data.columns.contains("device_name")) {
+    if (!data.columns.contains("Device")) {
       data.columns.foreach(column => if (!column.startsWith("root.") && column != "Time") sys.error("Invalidate column: " + column))
       val narrowDf = Transformer.toNarrowForm(sqlContext.sparkSession, data)
       DataFrameTools.insertDataFrame(iotdbOptions, narrowDf)
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
index 7c811e4..a114f0c 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
@@ -20,11 +20,11 @@
 package org.apache.iotdb.spark.db
 
 import java.sql.{Connection, DriverManager, ResultSet, Statement}
-
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
+import org.apache.spark.util.TaskCompletionListener
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 
 
@@ -61,7 +61,11 @@ class IoTDBRDD private[iotdb](
 
     var taskInfo: String = _
     Option(TaskContext.get()).foreach { taskContext =>
-      taskContext.addTaskCompletionListener { _ => conn.close() }
+      taskContext.addTaskCompletionListener(new TaskCompletionListener {
+        override def onTaskCompletion(context: TaskContext): Unit = {
+          conn.close()
+        }
+      })
       taskInfo = "task Id: " + taskContext.taskAttemptId() + " partition Id: " + taskContext.partitionId()
     }
 
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
index 16d053a..ab0a397 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
@@ -36,7 +36,7 @@ object Transformer {
     *              +---------+-------------+-------------+-------------+-------------+-------------+-------------+
     * @return tansferred data frame
     *         +---------+-----------+---+---+----+
-    *         |timestamp|device_name| m1| m2|  m3|
+    *         |timestamp|Device| m1| m2|  m3|
     *         +---------+-----------+---+---+----+
     *         |        1| root.ln.d2| 21| 22|  23|
     *         |        1| root.ln.d1| 11| 12|null|
@@ -49,7 +49,7 @@ object Transformer {
     // use to record all the measurement, prepare for the union
     var mMap = scala.collection.mutable.HashMap[String, DataType]()
 
-    // this step is to record device_name and measurement_name
+    // this step is to record Device and measurement_name
     df.schema.foreach(f => {
       if (!SQLConstant.TIMESTAMP_STR.equals(f.name)) {
         val pos = f.name.lastIndexOf('.')
@@ -69,7 +69,7 @@ object Transformer {
 
     // we first get each device's measurement data and then union them to get what we want, means:
     // +---------+-----------+---+---+----+
-    // |timestamp|device_name| m1| m2|  m3|
+    // |timestamp|Device| m1| m2|  m3|
     // +---------+-----------+---+---+----+
     // |        1| root.ln.d2| 21| 22|  23|
     // |        1| root.ln.d1| 11| 12|null|
@@ -77,7 +77,7 @@ object Transformer {
     var res: org.apache.spark.sql.DataFrame = null
     map.keys.foreach { deviceName =>
       // build query
-      var query = "select " + SQLConstant.TIMESTAMP_STR + ", \"" + deviceName + "\" as device_name"
+      var query = "select " + SQLConstant.TIMESTAMP_STR + ", \"" + deviceName + "\" as Device"
       val measurementName = map(deviceName)
       mMap.keySet.foreach { m =>
         val pos = measurementName.indexOf(m)
@@ -111,7 +111,7 @@ object Transformer {
     * @param spark your SparkSession
     * @param df    dataFrame need to be tansfer
     *              +---------+-----------+---+---+----+
-    *              |timestamp|device_name| m1| m2|  m3|
+    *              |timestamp|Device| m1| m2|  m3|
     *              +---------+-----------+---+---+----+
     *              |        1| root.ln.d2| 21| 22|  23|
     *              |        1| root.ln.d1| 11| 12|null|
@@ -126,8 +126,8 @@ object Transformer {
     */
   def toWideForm(spark: SparkSession, df: DataFrame): DataFrame = {
     df.createOrReplaceTempView("tsfle_narrow_form")
-    // get all device_name
-    val deviceNames = spark.sql("select distinct device_name from tsfle_narrow_form").collect()
+    // get all Device
+    val deviceNames = spark.sql("select distinct Device from tsfle_narrow_form").collect()
     val tableDF = spark.sql("select * from tsfle_narrow_form")
 
     import scala.collection.mutable.ListBuffer
@@ -135,7 +135,7 @@ object Transformer {
     val measurementNames = new ListBuffer[String]()
 
     tableDF.schema.foreach(f => {
-      if (!SQLConstant.TIMESTAMP_STR.equals(f.name) && !"device_name".equals(f.name)) {
+      if (!SQLConstant.TIMESTAMP_STR.equals(f.name) && !"Device".equals(f.name)) {
         measurementNames += f.name
       }
     })
@@ -155,7 +155,7 @@ object Transformer {
         query = query + ", " + measurementName + " as `" + deviceName(0) + "." + measurementName + "`"
       })
 
-      query = query + " from tsfle_narrow_form where device_name = \"" + deviceName(0) + "\""
+      query = query + " from tsfle_narrow_form where Device = \"" + deviceName(0) + "\""
       val curDF = spark.sql(query)
 
       if (res == null) {
diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
index ae9e41c..9bbea99 100644
--- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
+++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
@@ -82,9 +82,10 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
       .option("url", "jdbc:iotdb://127.0.0.1:6667/")
       .save
 
-    val result = session.executeQueryStatement("select ** from root")
+    val result = session.executeQueryStatement("select * from root")
     var size = 0
     while (result.hasNext) {
+      result.next()
       size += 1
     }
     assertResult(2)(size)
@@ -96,7 +97,7 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
       (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
 
     val dfWithColumn = df.withColumnRenamed("_1", "Time")
-      .withColumnRenamed("_2", "device_name")
+      .withColumnRenamed("_2", "Device")
       .withColumnRenamed("_3", "s0")
       .withColumnRenamed("_4", "s1")
       .withColumnRenamed("_5", "s2")
@@ -110,6 +111,7 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
     val result = session.executeQueryStatement("select * from root")
     var size = 0
     while (result.hasNext) {
+      result.next()
       size += 1
     }
     assertResult(2)(size)
diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/unit/DataFrameToolsTest.scala
similarity index 67%
copy from spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
copy to spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/unit/DataFrameToolsTest.scala
index ae9e41c..d7ea563 100644
--- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
+++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/unit/DataFrameToolsTest.scala
@@ -17,19 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.spark.db
+package org.apache.iotdb.spark.db.unit
 
 import org.apache.iotdb.db.conf.IoTDBConstant
 import org.apache.iotdb.db.service.IoTDB
 import org.apache.iotdb.jdbc.Config
 import org.apache.iotdb.session.Session
+import org.apache.iotdb.spark.db.{DataFrameTools, EnvironmentUtils, IoTDBOptions}
 import org.apache.spark.sql.SparkSession
-import org.junit.{AfterClass, Before}
+import org.junit._
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
-class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
-  private var daemon: IoTDB = _
+class DataFrameToolsTest extends FunSuite with BeforeAndAfterAll {
+
   private var spark: SparkSession = _
+  private var daemon: IoTDB = _
   private var session: Session = _
 
   @Before
@@ -46,7 +48,7 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
     spark = SparkSession
       .builder()
       .config("spark.master", "local")
-      .appName("TSFile test")
+      .appName("unit test")
       .getOrCreate()
 
     session = new Session("127.0.0.1", 6667, "root", "root")
@@ -66,50 +68,29 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
     super.afterAll()
   }
 
-  test("test insert wide data") {
-    val df = spark.createDataFrame(List(
-      (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
-      (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
-
-    val dfWithColumn = df.withColumnRenamed("_1", "Time")
-      .withColumnRenamed("_2", "root.test.d0.s0")
-      .withColumnRenamed("_3", "root.test.d0.s1")
-      .withColumnRenamed("_4", "root.test.d0.s2")
-      .withColumnRenamed("_5", "root.test.d0.s3")
-      .withColumnRenamed("_6", "root.test.d0.s4")
-      .withColumnRenamed("_7", "root.test.d0.s5")
-    dfWithColumn.write.format("org.apache.iotdb.spark.db")
-      .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .save
-
-    val result = session.executeQueryStatement("select ** from root")
-    var size = 0
-    while (result.hasNext) {
-      size += 1
-    }
-    assertResult(2)(size)
-  }
-
-  test("test insert narrow data") {
+  test("test insertDataFrame method") {
     val df = spark.createDataFrame(List(
       (1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
       (2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
 
     val dfWithColumn = df.withColumnRenamed("_1", "Time")
-      .withColumnRenamed("_2", "device_name")
+      .withColumnRenamed("_2", "Device")
       .withColumnRenamed("_3", "s0")
       .withColumnRenamed("_4", "s1")
       .withColumnRenamed("_5", "s2")
       .withColumnRenamed("_6", "s3")
       .withColumnRenamed("_7", "s4")
       .withColumnRenamed("_8", "s5")
-    dfWithColumn.write.format("org.apache.iotdb.spark.db")
-      .option("url", "jdbc:iotdb://127.0.0.1:6667/")
-      .save
+
+    val optionsMap = Map("url" -> "jdbc:iotdb://127.0.0.1:6667/", "numPartition" -> "1")
+    val options = new IoTDBOptions(optionsMap)
+
+    DataFrameTools.insertDataFrame(options ,dfWithColumn)
 
     val result = session.executeQueryStatement("select * from root")
     var size = 0
     while (result.hasNext) {
+      result.next()
       size += 1
     }
     assertResult(2)(size)