You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/07/11 02:49:17 UTC
[incubator-iotdb] branch master updated: [IOTDB-91] Improve
tsfile-spark-connector to support spark 2.4.3 (#227)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 0afdab3 [IOTDB-91] Improve tsfile-spark-connector to support spark 2.4.3 (#227)
0afdab3 is described below
commit 0afdab37c80bb5be944b6209e29fd36ea364845c
Author: Zesong Sun <15...@smail.nju.edu.cn>
AuthorDate: Thu Jul 11 10:49:13 2019 +0800
[IOTDB-91] Improve tsfile-spark-connector to support spark 2.4.3 (#227)
* update tsfile-spark-connector to support spark 2.4.3
---
.../Documentation/UserGuideV0.7.0/7-Tools-spark.md | 2 +-
pom.xml | 2 +-
spark/README.md | 2 +-
.../scala/org/apache/iotdb/tsfile/Converter.scala | 24 +++++++++++-----------
.../apache/iotdb/tsfile/TsFileOutputWriter.scala | 6 +++---
.../apache/iotdb/tsfile/TsFileWriterFactory.scala | 5 ++++-
.../org/apache/iotdb/tsfile/ConverterTest.scala | 7 ++++---
7 files changed, 26 insertions(+), 22 deletions(-)
diff --git a/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md b/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
index 3f88d4e..1d50a05 100644
--- a/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
+++ b/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
@@ -61,7 +61,7 @@ With this connector, you can
|Spark Version | Scala Version | Java Version | TsFile |
|------------- | ------------- | ------------ |------------ |
-| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
+| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
> Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.
diff --git a/pom.xml b/pom.xml
index d8544b9..9c77be2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
<slf4j.version>1.7.12</slf4j.version>
<logback.version>1.1.11</logback.version>
<joda.version>2.9.9</joda.version>
- <spark.version>2.0.1</spark.version>
+ <spark.version>2.4.3</spark.version>
<common.io.version>2.5</common.io.version>
<commons.collections4>4.0</commons.collections4>
<thrift.version>0.9.3</thrift.version>
diff --git a/spark/README.md b/spark/README.md
index 3f88d4e..1d50a05 100644
--- a/spark/README.md
+++ b/spark/README.md
@@ -61,7 +61,7 @@ With this connector, you can
|Spark Version | Scala Version | Java Version | TsFile |
|------------- | ------------- | ------------ |------------ |
-| `2.0.1` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
+| `2.4.3` | `2.11.8` | `1.8` | `0.8.0-SNAPSHOT`|
> Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.
diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
index 09156f7..4ad3c66 100755
--- a/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
+++ b/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary
import org.apache.iotdb.tsfile.write.record.TSRecord
import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
import org.apache.iotdb.tsfile.write.schema.{FileSchema, MeasurementSchema, SchemaBuilder}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
@@ -297,12 +297,12 @@ object Converter {
* @param row given spark sql row
* @return TSRecord
*/
- def toTsRecord(row: Row): List[TSRecord] = {
- val schema = row.schema
- val time = row.getAs[Long](QueryConstant.RESERVED_TIME)
+ def toTsRecord(row: InternalRow, dataSchema: StructType): List[TSRecord] = {
+ val time = row.getLong(0)
val deviceToRecord = scala.collection.mutable.Map[String, TSRecord]()
+ var index = 1
- schema.fields.filter(f => {
+ dataSchema.fields.filter(f => {
!QueryConstant.RESERVED_TIME.equals(f.name)
}).foreach(f => {
val name = f.name
@@ -315,20 +315,20 @@ object Converter {
val tsRecord: TSRecord = deviceToRecord.getOrElse(device, new TSRecord(time, device))
val dataType = getTsDataType(f.dataType)
- val index = row.fieldIndex(name)
if (!row.isNullAt(index)) {
val value = f.dataType match {
- case BooleanType => row.getAs[Boolean](name)
- case IntegerType => row.getAs[Int](name)
- case LongType => row.getAs[Long](name)
- case FloatType => row.getAs[Float](name)
- case DoubleType => row.getAs[Double](name)
- case StringType => row.getAs[String](name)
+ case BooleanType => row.getBoolean(index)
+ case IntegerType => row.getInt(index)
+ case LongType => row.getLong(index)
+ case FloatType => row.getFloat(index)
+ case DoubleType => row.getDouble(index)
+ case StringType => row.getString(index)
case other => throw new UnsupportedOperationException(s"Unsupported type $other")
}
val dataPoint = DataPoint.getDataPoint(dataType, measurement, value.toString)
tsRecord.addTuple(dataPoint)
}
+ index += 1
})
deviceToRecord.values.toList
}
diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala
index 0ced194..88a6755 100644
--- a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala
+++ b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala
@@ -22,9 +22,9 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
import org.apache.iotdb.tsfile.io.TsFileOutputFormat
import org.apache.iotdb.tsfile.write.record.TSRecord
-import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.OutputWriter
import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.InternalRow
private[tsfile] class TsFileOutputWriter(
pathStr: String,
@@ -37,9 +37,9 @@ private[tsfile] class TsFileOutputWriter(
new TsFileOutputFormat(fileSchema).getRecordWriter(context)
}
- override def write(row: Row): Unit = {
+ override def write(row: InternalRow): Unit = {
if (row != null) {
- val tsRecord = Converter.toTsRecord(row)
+ val tsRecord = Converter.toTsRecord(row, dataSchema)
tsRecord.foreach(r => {
recordWriter.write(NullWritable.get(), r)
})
diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala
index d78d093..8b0079a 100644
--- a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala
+++ b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala
@@ -27,9 +27,12 @@ private[tsfile] class TsFileWriterFactory(options: Map[String, String]) extends
override def newInstance(
path: String,
- bucketId: Option[Int],
dataSchema: StructType,
context: TaskAttemptContext): OutputWriter = {
new TsFileOutputWriter(path, dataSchema, options, context)
}
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ null
+ }
}
diff --git a/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala b/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala
index 9f2f36d..86bbd82 100644
--- a/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala
+++ b/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala
@@ -32,7 +32,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader
import org.apache.iotdb.tsfile.read.common.Field
import org.apache.iotdb.tsfile.utils.Binary
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericRowWithSchema}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.junit.Assert
@@ -223,8 +224,8 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
fields.add(StructField("device_2.sensor_2", IntegerType, true))
val schema = StructType(fields)
- var row: GenericRowWithSchema = new GenericRowWithSchema(Array(1L, null, 1.2f, 20, 19, 2.3f, 11), schema)
- val records = Converter.toTsRecord(row)
+ val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11))
+ val records = Converter.toTsRecord(row, schema)
Assert.assertEquals(2, records.size)
Assert.assertEquals(1, records(0).time)