You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/01/10 08:47:48 UTC
[iotdb] branch rel/0.12 updated: [To rel/0.12][IOTDB-2222] bugs of Spark Connector (#4678)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.12 by this push:
new 7c1b88d [To rel/0.12][IOTDB-2222] bugs of Spark Connector (#4678)
7c1b88d is described below
commit 7c1b88dd5f79d07fc0ce2865e4d72d5836291cc2
Author: Xuan Ronaldo <xu...@qq.com>
AuthorDate: Mon Jan 10 16:47:15 2022 +0800
[To rel/0.12][IOTDB-2222] bugs of Spark Connector (#4678)
---
.../UserGuide/Ecosystem Integration/Spark IoTDB.md | 8 +--
.../UserGuide/Ecosystem Integration/Spark IoTDB.md | 20 ++++----
pom.xml | 2 +-
spark-iotdb-connector/pom.xml | 16 ++++--
.../org/apache/iotdb/spark/db/DataFrameTools.scala | 58 ++++++++++++++++------
.../org/apache/iotdb/spark/db/DefaultSource.scala | 2 +-
.../scala/org/apache/iotdb/spark/db/IoTDBRDD.scala | 8 ++-
.../org/apache/iotdb/spark/db/Transformer.scala | 18 +++----
.../org/apache/iotdb/spark/db/IoTDBWriteTest.scala | 6 ++-
.../DataFrameToolsTest.scala} | 49 ++++++------------
10 files changed, 105 insertions(+), 82 deletions(-)
diff --git a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
index 9a62e22..ae20b85 100644
--- a/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
+++ b/docs/UserGuide/Ecosystem Integration/Spark IoTDB.md
@@ -25,7 +25,7 @@ The versions required for Spark and Java are as follow:
| Spark Version | Scala Version | Java Version | TsFile |
| :-------------: | :-------------: | :------------: |:------------: |
-| `2.4.3` | `2.11` | `1.8` | `0.12.0`|
+| `2.4.5` | `2.12` | `1.8` | `0.12.0`|
> Currently we only support spark version 2.4.3 and there are some known issue on 2.4.7, do no use it
@@ -41,7 +41,7 @@ mvn clean scala:compile compile install
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>spark-iotdb-connector</artifactId>
- <version>0.10.0</version>
+ <version>0.12.4</version>
</dependency>
```
@@ -201,8 +201,10 @@ val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .option("numPartition", "10")
.save
```
### Notes
-1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
\ No newline at end of file
+1. You can directly write data to IoTDB whatever the dataframe contains a wide table or a narrow table.
+2. The parameter `numPartition` is used to set the number of partitions. The dataframe that you want to save will be repartition based on this parameter before writing data. Each partition will open a session to write data to increase the number of concurrent requests.
\ No newline at end of file
diff --git a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
index c83f0b1..3923362 100644
--- a/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
+++ b/docs/zh/UserGuide/Ecosystem Integration/Spark IoTDB.md
@@ -27,7 +27,7 @@ Spark和Java所需的版本如下:
| Spark Version | Scala Version | Java Version | TsFile |
| ------------- | ------------- | ------------ | -------- |
-| `2.4.3` | `2.11` | `1.8` | `0.12.0` |
+| `2.4.5` | `2.12` | `1.8` | `0.12.0` |
### 安装
@@ -39,14 +39,14 @@ mvn clean scala:compile compile install
<dependency>
<groupId>org.apache.iotdb</groupId>
<artifactId>spark-iotdb-connector</artifactId>
- <version>0.10.0</version>
+ <version>0.12.4</version>
</dependency>
```
#### Spark-shell用户指南
```
-spark-shell --jars spark-iotdb-connector-0.12.0.jar,iotdb-jdbc-0.12.0-jar-with-dependencies.jar
+spark-shell --jars spark-iotdb-connector-0.12.4.jar,iotdb-jdbc-0.12.4-jar-with-dependencies.jar
import org.apache.iotdb.spark.db._
@@ -88,11 +88,11 @@ TsFile中的现有数据如下:
* d1:root.ln.wf01.wt01
* d2:root.ln.wf02.wt02
-time|d1.status|time|d1.temperature |time | d2.hardware |time|d2.status
----- | ---- | ---- | ---- | ---- | ---- | ---- | ---- | ----
-1|True |1|2.2|2|"aaa"|1|True
-3|True |2|2.2|4|"bbb"|2|False
-5|False|3 |2.1|6 |"ccc"|4|True
+| time | d1.status | time | d1.temperature | time | d2.hardware | time | d2.status |
+| ---- | --------- | ---- | -------------- | ---- | ----------- | ---- | --------- |
+| 1 | True | 1 | 2.2 | 2 | "aaa" | 1 | True |
+| 3 | True | 2 | 2.2 | 4 | "bbb" | 2 | False |
+| 5 | False | 3 | 2.1 | 6 | "ccc" | 4 | True |
宽(默认)表形式如下:
@@ -205,8 +205,10 @@ val dfWithColumn = df.withColumnRenamed("_1", "Time")
.withColumnRenamed("_7", "root.test.d0.s5")
dfWithColumn.write.format("org.apache.iotdb.spark.db")
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
+ .option("numPartition", "10")
.save
```
### 注意
-1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
\ No newline at end of file
+1. 无论dataframe中存放的是窄表还是宽表,都可以直接将数据写到IoTDB中。
+2. numPartition参数是用来设置分区数,会在写入数据之前给dataframe进行重分区。每一个分区都会开启一个session进行数据的写入,来提高并发数。
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6e03cf7..91e446c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -118,7 +118,7 @@
<slf4j.version>1.7.12</slf4j.version>
<logback.version>1.2.10</logback.version>
<joda.version>2.9.9</joda.version>
- <spark.version>2.4.3</spark.version>
+ <spark.version>2.4.5</spark.version>
<flink.version>1.11.1</flink.version>
<common.io.version>2.5</common.io.version>
<commons.collections4>4.4</commons.collections4>
diff --git a/spark-iotdb-connector/pom.xml b/spark-iotdb-connector/pom.xml
index 91f31ae..dba810c 100644
--- a/spark-iotdb-connector/pom.xml
+++ b/spark-iotdb-connector/pom.xml
@@ -32,8 +32,8 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<compile.version>1.8</compile.version>
- <scala.library.version>2.11</scala.library.version>
- <scala.version>2.11.12</scala.version>
+ <scala.library.version>2.12.10</scala.library.version>
+ <scala.version>2.12.10</scala.version>
</properties>
<dependencies>
<dependency>
@@ -60,13 +60,13 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
+ <artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_2.11</artifactId>
+ <artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
@@ -77,10 +77,16 @@
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
- <artifactId>scalatest_2.11</artifactId>
+ <artifactId>scalatest_2.12</artifactId>
+ <version>3.0.2</version>
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.6.7</version>
+ </dependency>
+ <dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala
index adc08b6..a6f00a2 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DataFrameTools.scala
@@ -22,25 +22,20 @@ package org.apache.iotdb.spark.db
import org.apache.iotdb.session.Session
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType.{BOOLEAN, DOUBLE, FLOAT, INT32, INT64, TEXT}
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Row}
import java.util
import java.lang
object DataFrameTools {
def insertDataFrame(options: IoTDBOptions, dataframe: DataFrame): Unit = {
- val filteredColumns = Array[String]("Time", "device_name")
+ val filteredColumns = Array[String]("Time", "Device")
val sensorTypes = dataframe.dtypes.filter(x => !filteredColumns.contains(x._1))
- val devices = dataframe
- .select("device_name")
- .distinct()
- .collect()
- .map(x => x.get(0))
-
dataframe
- .repartition(devices.length, dataframe.col("device_name"))
- .foreachPartition { partition =>
+ .repartition(options.numPartition.toInt)
+ .sortWithinPartitions(dataframe.col("Device"))
+ .foreachPartition { (partition: Iterator[Row]) =>
val hostPort = options.url.split("//")(1).replace("/", "").split(":")
val session = new Session(
hostPort(0),
@@ -50,13 +45,22 @@ object DataFrameTools {
)
session.open()
+ var device: lang.String = ""
val times = new util.ArrayList[lang.Long]()
val measurementsList = new util.ArrayList[util.List[lang.String]]()
val typesList = new util.ArrayList[util.List[TSDataType]]()
val valuesList = new util.ArrayList[util.List[Object]]()
- var device: lang.String = ""
+
+ val batchSize = 1000
+ var currentSize = 0
+
partition.foreach { record =>
if ("".equals(device)) device = record.get(1).toString
+ else if (!device.equals(record.get(1).toString)) {
+ insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
+ device = record.get(1).toString
+ currentSize = 0
+ }
val measurements = new util.ArrayList[lang.String]()
val types = new util.ArrayList[TSDataType]()
val values = new util.ArrayList[Object]()
@@ -67,13 +71,22 @@ object DataFrameTools {
measurements.add(sensorTypes(i - 2)._1)
types.add(getType(sensorTypes(i - 2)._2))
}
- times.add(record.get(0).asInstanceOf[Long])
- measurementsList.add(measurements)
- typesList.add(types)
- valuesList.add(values)
+ if (!values.isEmpty) {
+ times.add(record.get(0).asInstanceOf[Long])
+ measurementsList.add(measurements)
+ typesList.add(types)
+ valuesList.add(values)
+ currentSize += 1
+ }
+ if (currentSize >= batchSize) {
+ insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
+ currentSize = 0
+ }
}
- session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList)
+ if (!valuesList.isEmpty) {
+ insertAndEmptyDataSet(session, device, times, measurementsList, typesList, valuesList)
+ }
session.close()
}
@@ -102,4 +115,17 @@ object DataFrameTools {
case _ => null
}
}
+
+ def insertAndEmptyDataSet(session: Session,
+ device: lang.String,
+ times: util.ArrayList[lang.Long],
+ measurementsList: util.ArrayList[util.List[lang.String]],
+ typesList: util.ArrayList[util.List[TSDataType]],
+ valuesList: util.ArrayList[util.List[Object]]): Unit = {
+ session.insertRecordsOfOneDevice(device, times, measurementsList, typesList, valuesList)
+ times.clear()
+ measurementsList.clear()
+ typesList.clear()
+ valuesList.clear()
+ }
}
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
index d5238d9..3174918 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/DefaultSource.scala
@@ -48,7 +48,7 @@ private[iotdb] class DefaultSource extends RelationProvider with DataSourceRegis
}
val iotdbOptions = new IoTDBOptions(parameters)
- if (!data.columns.contains("device_name")) {
+ if (!data.columns.contains("Device")) {
data.columns.foreach(column => if (!column.startsWith("root.") && column != "Time") sys.error("Invalidate column: " + column))
val narrowDf = Transformer.toNarrowForm(sqlContext.sparkSession, data)
DataFrameTools.insertDataFrame(iotdbOptions, narrowDf)
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
index 7c811e4..a114f0c 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/IoTDBRDD.scala
@@ -20,11 +20,11 @@
package org.apache.iotdb.spark.db
import java.sql.{Connection, DriverManager, ResultSet, Statement}
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
+import org.apache.spark.util.TaskCompletionListener
import org.apache.spark.{Partition, SparkContext, TaskContext}
@@ -61,7 +61,11 @@ class IoTDBRDD private[iotdb](
var taskInfo: String = _
Option(TaskContext.get()).foreach { taskContext =>
- taskContext.addTaskCompletionListener { _ => conn.close() }
+ taskContext.addTaskCompletionListener(new TaskCompletionListener {
+ override def onTaskCompletion(context: TaskContext): Unit = {
+ conn.close()
+ }
+ })
taskInfo = "task Id: " + taskContext.taskAttemptId() + " partition Id: " + taskContext.partitionId()
}
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
index 16d053a..ab0a397 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
@@ -36,7 +36,7 @@ object Transformer {
* +---------+-------------+-------------+-------------+-------------+-------------+-------------+
* @return tansferred data frame
* +---------+-----------+---+---+----+
- * |timestamp|device_name| m1| m2| m3|
+ * |timestamp|Device| m1| m2| m3|
* +---------+-----------+---+---+----+
* | 1| root.ln.d2| 21| 22| 23|
* | 1| root.ln.d1| 11| 12|null|
@@ -49,7 +49,7 @@ object Transformer {
// use to record all the measurement, prepare for the union
var mMap = scala.collection.mutable.HashMap[String, DataType]()
- // this step is to record device_name and measurement_name
+ // this step is to record Device and measurement_name
df.schema.foreach(f => {
if (!SQLConstant.TIMESTAMP_STR.equals(f.name)) {
val pos = f.name.lastIndexOf('.')
@@ -69,7 +69,7 @@ object Transformer {
// we first get each device's measurement data and then union them to get what we want, means:
// +---------+-----------+---+---+----+
- // |timestamp|device_name| m1| m2| m3|
+ // |timestamp|Device| m1| m2| m3|
// +---------+-----------+---+---+----+
// | 1| root.ln.d2| 21| 22| 23|
// | 1| root.ln.d1| 11| 12|null|
@@ -77,7 +77,7 @@ object Transformer {
var res: org.apache.spark.sql.DataFrame = null
map.keys.foreach { deviceName =>
// build query
- var query = "select " + SQLConstant.TIMESTAMP_STR + ", \"" + deviceName + "\" as device_name"
+ var query = "select " + SQLConstant.TIMESTAMP_STR + ", \"" + deviceName + "\" as Device"
val measurementName = map(deviceName)
mMap.keySet.foreach { m =>
val pos = measurementName.indexOf(m)
@@ -111,7 +111,7 @@ object Transformer {
* @param spark your SparkSession
* @param df dataFrame need to be tansfer
* +---------+-----------+---+---+----+
- * |timestamp|device_name| m1| m2| m3|
+ * |timestamp|Device| m1| m2| m3|
* +---------+-----------+---+---+----+
* | 1| root.ln.d2| 21| 22| 23|
* | 1| root.ln.d1| 11| 12|null|
@@ -126,8 +126,8 @@ object Transformer {
*/
def toWideForm(spark: SparkSession, df: DataFrame): DataFrame = {
df.createOrReplaceTempView("tsfle_narrow_form")
- // get all device_name
- val deviceNames = spark.sql("select distinct device_name from tsfle_narrow_form").collect()
+ // get all Device
+ val deviceNames = spark.sql("select distinct Device from tsfle_narrow_form").collect()
val tableDF = spark.sql("select * from tsfle_narrow_form")
import scala.collection.mutable.ListBuffer
@@ -135,7 +135,7 @@ object Transformer {
val measurementNames = new ListBuffer[String]()
tableDF.schema.foreach(f => {
- if (!SQLConstant.TIMESTAMP_STR.equals(f.name) && !"device_name".equals(f.name)) {
+ if (!SQLConstant.TIMESTAMP_STR.equals(f.name) && !"Device".equals(f.name)) {
measurementNames += f.name
}
})
@@ -155,7 +155,7 @@ object Transformer {
query = query + ", " + measurementName + " as `" + deviceName(0) + "." + measurementName + "`"
})
- query = query + " from tsfle_narrow_form where device_name = \"" + deviceName(0) + "\""
+ query = query + " from tsfle_narrow_form where Device = \"" + deviceName(0) + "\""
val curDF = spark.sql(query)
if (res == null) {
diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
index ae9e41c..9bbea99 100644
--- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
+++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
@@ -82,9 +82,10 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
.option("url", "jdbc:iotdb://127.0.0.1:6667/")
.save
- val result = session.executeQueryStatement("select ** from root")
+ val result = session.executeQueryStatement("select * from root")
var size = 0
while (result.hasNext) {
+ result.next()
size += 1
}
assertResult(2)(size)
@@ -96,7 +97,7 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
- .withColumnRenamed("_2", "device_name")
+ .withColumnRenamed("_2", "Device")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
@@ -110,6 +111,7 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
val result = session.executeQueryStatement("select * from root")
var size = 0
while (result.hasNext) {
+ result.next()
size += 1
}
assertResult(2)(size)
diff --git a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/unit/DataFrameToolsTest.scala
similarity index 67%
copy from spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
copy to spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/unit/DataFrameToolsTest.scala
index ae9e41c..d7ea563 100644
--- a/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/IoTDBWriteTest.scala
+++ b/spark-iotdb-connector/src/test/scala/org/apache/iotdb/spark/db/unit/DataFrameToolsTest.scala
@@ -17,19 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.spark.db
+package org.apache.iotdb.spark.db.unit
import org.apache.iotdb.db.conf.IoTDBConstant
import org.apache.iotdb.db.service.IoTDB
import org.apache.iotdb.jdbc.Config
import org.apache.iotdb.session.Session
+import org.apache.iotdb.spark.db.{DataFrameTools, EnvironmentUtils, IoTDBOptions}
import org.apache.spark.sql.SparkSession
-import org.junit.{AfterClass, Before}
+import org.junit._
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
- private var daemon: IoTDB = _
+class DataFrameToolsTest extends FunSuite with BeforeAndAfterAll {
+
private var spark: SparkSession = _
+ private var daemon: IoTDB = _
private var session: Session = _
@Before
@@ -46,7 +48,7 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
spark = SparkSession
.builder()
.config("spark.master", "local")
- .appName("TSFile test")
+ .appName("unit test")
.getOrCreate()
session = new Session("127.0.0.1", 6667, "root", "root")
@@ -66,50 +68,29 @@ class IoTDBWriteTest extends FunSuite with BeforeAndAfterAll {
super.afterAll()
}
- test("test insert wide data") {
- val df = spark.createDataFrame(List(
- (1L, 1, 1L, 1.0F, 1.0D, true, "hello"),
- (2L, 2, 2L, 2.0F, 2.0D, false, "world")))
-
- val dfWithColumn = df.withColumnRenamed("_1", "Time")
- .withColumnRenamed("_2", "root.test.d0.s0")
- .withColumnRenamed("_3", "root.test.d0.s1")
- .withColumnRenamed("_4", "root.test.d0.s2")
- .withColumnRenamed("_5", "root.test.d0.s3")
- .withColumnRenamed("_6", "root.test.d0.s4")
- .withColumnRenamed("_7", "root.test.d0.s5")
- dfWithColumn.write.format("org.apache.iotdb.spark.db")
- .option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .save
-
- val result = session.executeQueryStatement("select ** from root")
- var size = 0
- while (result.hasNext) {
- size += 1
- }
- assertResult(2)(size)
- }
-
- test("test insert narrow data") {
+ test("test insertDataFrame method") {
val df = spark.createDataFrame(List(
(1L, "root.test.d0",1, 1L, 1.0F, 1.0D, true, "hello"),
(2L, "root.test.d0", 2, 2L, 2.0F, 2.0D, false, "world")))
val dfWithColumn = df.withColumnRenamed("_1", "Time")
- .withColumnRenamed("_2", "device_name")
+ .withColumnRenamed("_2", "Device")
.withColumnRenamed("_3", "s0")
.withColumnRenamed("_4", "s1")
.withColumnRenamed("_5", "s2")
.withColumnRenamed("_6", "s3")
.withColumnRenamed("_7", "s4")
.withColumnRenamed("_8", "s5")
- dfWithColumn.write.format("org.apache.iotdb.spark.db")
- .option("url", "jdbc:iotdb://127.0.0.1:6667/")
- .save
+
+ val optionsMap = Map("url" -> "jdbc:iotdb://127.0.0.1:6667/", "numPartition" -> "1")
+ val options = new IoTDBOptions(optionsMap)
+
+ DataFrameTools.insertDataFrame(options ,dfWithColumn)
val result = session.executeQueryStatement("select * from root")
var size = 0
while (result.hasNext) {
+ result.next()
size += 1
}
assertResult(2)(size)