You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/07/11 02:49:17 UTC

[incubator-iotdb] branch master updated: [IOTDB-91] Improve tsfile-spark-connector to support spark 2.4.3 (#227)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0afdab3  [IOTDB-91] Improve tsfile-spark-connector to support spark 2.4.3 (#227)
0afdab3 is described below

commit 0afdab37c80bb5be944b6209e29fd36ea364845c
Author: Zesong Sun <15...@smail.nju.edu.cn>
AuthorDate: Thu Jul 11 10:49:13 2019 +0800

    [IOTDB-91] Improve tsfile-spark-connector to support spark 2.4.3 (#227)
    
    * update tsfile-spark-connector to support spark 2.4.3
---
 .../Documentation/UserGuideV0.7.0/7-Tools-spark.md |  2 +-
 pom.xml                                            |  2 +-
 spark/README.md                                    |  2 +-
 .../scala/org/apache/iotdb/tsfile/Converter.scala  | 24 +++++++++++-----------
 .../apache/iotdb/tsfile/TsFileOutputWriter.scala   |  6 +++---
 .../apache/iotdb/tsfile/TsFileWriterFactory.scala  |  5 ++++-
 .../org/apache/iotdb/tsfile/ConverterTest.scala    |  7 ++++---
 7 files changed, 26 insertions(+), 22 deletions(-)

diff --git a/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md b/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
index 3f88d4e..1d50a05 100644
--- a/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
+++ b/docs/Documentation/UserGuideV0.7.0/7-Tools-spark.md
@@ -61,7 +61,7 @@ With this connector, you can
 
 |Spark Version | Scala Version | Java Version | TsFile |
 |------------- | ------------- | ------------ |------------ |
-| `2.0.1`        | `2.11.8`        | `1.8`        | `0.8.0-SNAPSHOT`|
+| `2.4.3`        | `2.11.8`        | `1.8`        | `0.8.0-SNAPSHOT`|
 
 > Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.
 
diff --git a/pom.xml b/pom.xml
index d8544b9..9c77be2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -55,7 +55,7 @@
         <slf4j.version>1.7.12</slf4j.version>
         <logback.version>1.1.11</logback.version>
         <joda.version>2.9.9</joda.version>
-        <spark.version>2.0.1</spark.version>
+        <spark.version>2.4.3</spark.version>
         <common.io.version>2.5</common.io.version>
         <commons.collections4>4.0</commons.collections4>
         <thrift.version>0.9.3</thrift.version>
diff --git a/spark/README.md b/spark/README.md
index 3f88d4e..1d50a05 100644
--- a/spark/README.md
+++ b/spark/README.md
@@ -61,7 +61,7 @@ With this connector, you can
 
 |Spark Version | Scala Version | Java Version | TsFile |
 |------------- | ------------- | ------------ |------------ |
-| `2.0.1`        | `2.11.8`        | `1.8`        | `0.8.0-SNAPSHOT`|
+| `2.4.3`        | `2.11.8`        | `1.8`        | `0.8.0-SNAPSHOT`|
 
 > Note: For more information about how to download and use TsFile, please see the following link: https://github.com/apache/incubator-iotdb/tree/master/tsfile.
 
diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
index 09156f7..4ad3c66 100755
--- a/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
+++ b/spark/src/main/scala/org/apache/iotdb/tsfile/Converter.scala
@@ -36,7 +36,7 @@ import org.apache.iotdb.tsfile.utils.Binary
 import org.apache.iotdb.tsfile.write.record.TSRecord
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint
 import org.apache.iotdb.tsfile.write.schema.{FileSchema, MeasurementSchema, SchemaBuilder}
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 
@@ -297,12 +297,12 @@ object Converter {
     * @param row given spark sql row
     * @return TSRecord
     */
-  def toTsRecord(row: Row): List[TSRecord] = {
-    val schema = row.schema
-    val time = row.getAs[Long](QueryConstant.RESERVED_TIME)
+  def toTsRecord(row: InternalRow, dataSchema: StructType): List[TSRecord] = {
+    val time = row.getLong(0)
     val deviceToRecord = scala.collection.mutable.Map[String, TSRecord]()
+    var index = 1
 
-    schema.fields.filter(f => {
+    dataSchema.fields.filter(f => {
       !QueryConstant.RESERVED_TIME.equals(f.name)
     }).foreach(f => {
       val name = f.name
@@ -315,20 +315,20 @@ object Converter {
       val tsRecord: TSRecord = deviceToRecord.getOrElse(device, new TSRecord(time, device))
 
       val dataType = getTsDataType(f.dataType)
-      val index = row.fieldIndex(name)
       if (!row.isNullAt(index)) {
         val value = f.dataType match {
-          case BooleanType => row.getAs[Boolean](name)
-          case IntegerType => row.getAs[Int](name)
-          case LongType => row.getAs[Long](name)
-          case FloatType => row.getAs[Float](name)
-          case DoubleType => row.getAs[Double](name)
-          case StringType => row.getAs[String](name)
+          case BooleanType => row.getBoolean(index)
+          case IntegerType => row.getInt(index)
+          case LongType => row.getLong(index)
+          case FloatType => row.getFloat(index)
+          case DoubleType => row.getDouble(index)
+          case StringType => row.getString(index)
           case other => throw new UnsupportedOperationException(s"Unsupported type $other")
         }
         val dataPoint = DataPoint.getDataPoint(dataType, measurement, value.toString)
         tsRecord.addTuple(dataPoint)
       }
+      index += 1
     })
     deviceToRecord.values.toList
   }
diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala
index 0ced194..88a6755 100644
--- a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala
+++ b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileOutputWriter.scala
@@ -22,9 +22,9 @@ import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext}
 import org.apache.iotdb.tsfile.io.TsFileOutputFormat
 import org.apache.iotdb.tsfile.write.record.TSRecord
-import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.datasources.OutputWriter
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.catalyst.InternalRow
 
 private[tsfile] class TsFileOutputWriter(
                                           pathStr: String,
@@ -37,9 +37,9 @@ private[tsfile] class TsFileOutputWriter(
     new TsFileOutputFormat(fileSchema).getRecordWriter(context)
   }
 
-  override def write(row: Row): Unit = {
+  override def write(row: InternalRow): Unit = {
     if (row != null) {
-      val tsRecord = Converter.toTsRecord(row)
+      val tsRecord = Converter.toTsRecord(row, dataSchema)
       tsRecord.foreach(r => {
         recordWriter.write(NullWritable.get(), r)
       })
diff --git a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala
index d78d093..8b0079a 100644
--- a/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala
+++ b/spark/src/main/scala/org/apache/iotdb/tsfile/TsFileWriterFactory.scala
@@ -27,9 +27,12 @@ private[tsfile] class TsFileWriterFactory(options: Map[String, String]) extends
 
   override def newInstance(
                             path: String,
-                            bucketId: Option[Int],
                             dataSchema: StructType,
                             context: TaskAttemptContext): OutputWriter = {
     new TsFileOutputWriter(path, dataSchema, options, context)
   }
+
+  override def getFileExtension(context: TaskAttemptContext): String = {
+    null
+  }
 }
diff --git a/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala b/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala
index 9f2f36d..86bbd82 100644
--- a/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala
+++ b/spark/src/test/scala/org/apache/iotdb/tsfile/ConverterTest.scala
@@ -32,7 +32,8 @@ import org.apache.iotdb.tsfile.read.TsFileSequenceReader
 import org.apache.iotdb.tsfile.read.common.Field
 import org.apache.iotdb.tsfile.utils.Binary
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, GenericRowWithSchema}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.junit.Assert
@@ -223,8 +224,8 @@ class ConverterTest extends FunSuite with BeforeAndAfterAll {
     fields.add(StructField("device_2.sensor_2", IntegerType, true))
     val schema = StructType(fields)
 
-    var row: GenericRowWithSchema = new GenericRowWithSchema(Array(1L, null, 1.2f, 20, 19, 2.3f, 11), schema)
-    val records = Converter.toTsRecord(row)
+    val row: InternalRow = new GenericInternalRow(Array(1L, null, 1.2f, 20, 19, 2.3f, 11))
+    val records = Converter.toTsRecord(row, schema)
 
     Assert.assertEquals(2, records.size)
     Assert.assertEquals(1, records(0).time)