You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2019/11/07 13:39:21 UTC
[incubator-iotdb] branch master updated: Fixed some variable naming
problem (#528)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new efac7c4 Fixed some variable naming problem (#528)
efac7c4 is described below
commit efac7c4ba3a46429d73df18108e9c3bed6e44dda
Author: Francis Du <fr...@francisdu.com>
AuthorDate: Thu Nov 7 21:39:15 2019 +0800
Fixed some variable naming problem (#528)
---
.../org/apache/iotdb/spark/db/Transformer.scala | 64 +++++++++++-----------
.../apache/iotdb/spark/tsfile/DefaultSource.scala | 16 +++---
.../apache/iotdb/spark/tsfile/Transformer.scala | 56 +++++++++----------
3 files changed, 67 insertions(+), 69 deletions(-)
diff --git a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
index 1c2c89b..347ef30 100644
--- a/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
+++ b/spark-iotdb-connector/src/main/scala/org/apache/iotdb/spark/db/Transformer.scala
@@ -41,29 +41,28 @@ object Transformer {
* | 1| root.ln.d1| 11| 12|null|
* +---------+-----------+---+---+----+
*/
- def toNarrowForm(spark: SparkSession,
- df: DataFrame): DataFrame = {
+ def toNarrowForm(spark: SparkSession, df: DataFrame): DataFrame = {
df.createOrReplaceTempView("tsfle_wide_form")
// use to record device and their measurement
var map = new scala.collection.mutable.HashMap[String, List[String]]()
// use to record all the measurement, prepare for the union
- var m_map = scala.collection.mutable.HashMap[String, DataType]()
+ var mMap = scala.collection.mutable.HashMap[String, DataType]()
// this step is to record device_name and measurement_name
df.schema.foreach(f => {
if (!SQLConstant.TIMESTAMP_STR.equals(f.name)) {
val pos = f.name.lastIndexOf('.')
- val divice_name = f.name.substring(0, pos)
- val measurement_name = f.name.substring(pos + 1)
- if (map.contains(divice_name)) {
- map(divice_name) = map(divice_name) :+ measurement_name
+ val diviceName = f.name.substring(0, pos)
+ val measurementName = f.name.substring(pos + 1)
+ if (map.contains(diviceName)) {
+ map(diviceName) = map(diviceName) :+ measurementName
}
else {
var l: List[String] = List()
- l = l :+ measurement_name
- map += (divice_name -> l)
+ l = l :+ measurementName
+ map += (diviceName -> l)
}
- m_map += (measurement_name -> f.dataType)
+ mMap += (measurementName -> f.dataType)
}
})
@@ -75,15 +74,15 @@ object Transformer {
// | 1| root.ln.d1| 11| 12|null|
// +---------+-----------+---+---+----+
var res: org.apache.spark.sql.DataFrame = null
- map.keys.foreach { device_name =>
+ map.keys.foreach { deviceName =>
// build query
- var query = "select " + SQLConstant.TIMESTAMP_STR + ", \"" + device_name + "\" as device_name"
- val measurement_name = map(device_name)
- m_map.keySet.foreach { m =>
- val pos = measurement_name.indexOf(m)
+ var query = "select " + SQLConstant.TIMESTAMP_STR + ", \"" + deviceName + "\" as device_name"
+ val measurementName = map(deviceName)
+ mMap.keySet.foreach { m =>
+ val pos = measurementName.indexOf(m)
if (pos >= 0) {
// select normal column
- query += ", `" + device_name + "." + m + "` as " + m
+ query += ", `" + deviceName + "." + m + "` as " + m
}
else {
// fill null column
@@ -92,13 +91,13 @@ object Transformer {
}
query += " from tsfle_wide_form"
- var cur_df = spark.sql(query)
+ val curDF = spark.sql(query)
if (res == null) {
- res = cur_df
+ res = curDF
}
else {
- res = res.union(cur_df)
+ res = res.union(curDF)
}
}
@@ -124,20 +123,19 @@ object Transformer {
* +---------+-------------+-------------+-------------+-------------+-------------+-------------+
*
*/
- def toWideForm(spark: SparkSession,
- df: DataFrame): DataFrame = {
+ def toWideForm(spark: SparkSession, df: DataFrame): DataFrame = {
df.createOrReplaceTempView("tsfle_narrow_form")
// get all device_name
- val device_names = spark.sql("select distinct device_name from tsfle_narrow_form").collect()
- val table_df = spark.sql("select * from tsfle_narrow_form")
+ val deviceNames = spark.sql("select distinct device_name from tsfle_narrow_form").collect()
+ val tableDF = spark.sql("select * from tsfle_narrow_form")
import scala.collection.mutable.ListBuffer
// get all measurement_name
- val measurement_names = new ListBuffer[String]()
+ val measurementNames = new ListBuffer[String]()
- table_df.schema.foreach(f => {
+ tableDF.schema.foreach(f => {
if (!SQLConstant.TIMESTAMP_STR.equals(f.name) && !"device_name".equals(f.name)) {
- measurement_names += f.name
+ measurementNames += f.name
}
})
@@ -149,21 +147,21 @@ object Transformer {
// | 1| 11| 12| null| 21| 22| 23|
// +---------+-------------+-------------+-------------+-------------+-------------+-------------+
- device_names.foreach(device_name => {
+ deviceNames.foreach(deviceName => {
var query = "select " + SQLConstant.TIMESTAMP_STR
- measurement_names.foreach(measurement_name => {
- query = query + ", " + measurement_name + " as `" + device_name(0) + "." + measurement_name + "`"
+ measurementNames.foreach(measurementName => {
+ query = query + ", " + measurementName + " as `" + deviceName(0) + "." + measurementName + "`"
})
- query = query + " from tsfle_narrow_form where device_name = \"" + device_name(0) + "\""
- val cur_df = spark.sql(query)
+ query = query + " from tsfle_narrow_form where device_name = \"" + deviceName(0) + "\""
+ val curDF = spark.sql(query)
if (res == null) {
- res = cur_df
+ res = curDF
}
else {
- res = res.join(cur_df, List(SQLConstant.TIMESTAMP_STR), "outer")
+ res = res.join(curDF, List(SQLConstant.TIMESTAMP_STR), "outer")
}
})
diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala
index de13b2a..76329c0 100755
--- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala
+++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/DefaultSource.scala
@@ -115,12 +115,12 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
}
if (options.getOrElse(DefaultSource.isNarrowForm, "").equals("narrow_form")) {
- val device_names = tsFileMetaData.getDeviceMap.keySet()
- val measurement_names = tsFileMetaData.getMeasurementSchema.keySet()
+ val deviceNames = tsFileMetaData.getDeviceMap.keySet()
+ val measurementNames = tsFileMetaData.getMeasurementSchema.keySet()
// construct queryExpression based on queriedSchema and filters
- val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, device_names,
- measurement_names, filters, reader, file.start.asInstanceOf[java.lang.Long],
+ val queryExpressions = NarrowConverter.toQueryExpression(dataSchema, deviceNames,
+ measurementNames, filters, reader, file.start.asInstanceOf[java.lang.Long],
(file.start + file.length).asInstanceOf[java.lang.Long])
val queryDataSets = Executor.query(readTsFile, queryExpressions,
@@ -128,7 +128,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
(file.start + file.length).asInstanceOf[java.lang.Long])
var queryDataSet: QueryDataSet = null
- var device_name: String = null
+ var deviceName: String = null
def queryNext(): Boolean = {
if (queryDataSet != null && queryDataSet.hasNext) {
@@ -146,7 +146,7 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
}
queryDataSet = queryDataSets.remove(queryDataSets.size() - 1)
}
- device_name = queryDataSet.getPaths.get(0).getDevice
+ deviceName = queryDataSet.getPaths.get(0).getDevice
true
}
@@ -174,10 +174,10 @@ private[tsfile] class DefaultSource extends FileFormat with DataSourceRegister {
rowBuffer(index) = curRecord.getTimestamp
}
else if (field.name == NarrowConverter.DEVICE_NAME) {
- rowBuffer(index) = device_name
+ rowBuffer(index) = deviceName
}
else {
- val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(device_name,
+ val pos = paths.indexOf(new org.apache.iotdb.tsfile.read.common.Path(deviceName,
field.name))
var curField: Field = null
if (pos != -1) {
diff --git a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala
index eeff973..8dcccf5 100644
--- a/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala
+++ b/spark-tsfile/src/main/scala/org/apache/iotdb/spark/tsfile/Transformer.scala
@@ -44,27 +44,27 @@ object Transformer {
*/
def toNewForm(spark: SparkSession,
df: DataFrame): DataFrame = {
- df.registerTempTable("tsfle_old_form")
+ df.createOrReplaceTempView("tsfle_old_form")
// use to record device and their measurement
var map = new scala.collection.mutable.HashMap[String, List[String]]()
// use to record all the measurement, prepare for the union
- var m_map = scala.collection.mutable.HashMap[String, DataType]()
+ var mMap = scala.collection.mutable.HashMap[String, DataType]()
// this step is to record device_name and measurement_name
df.schema.foreach(f => {
if (!QueryConstant.RESERVED_TIME.equals(f.name)) {
val pos = f.name.lastIndexOf('.')
- val divice_name = f.name.substring(0, pos)
+ val diviceName = f.name.substring(0, pos)
val measurement_name = f.name.substring(pos + 1)
- if (map.contains(divice_name)) {
- map(divice_name) = map(divice_name) :+ measurement_name
+ if (map.contains(diviceName)) {
+ map(diviceName) = map(diviceName) :+ measurement_name
}
else {
var l: List[String] = List()
l = l :+ (measurement_name)
- map += (divice_name -> l)
+ map += (diviceName -> l)
}
- m_map += (measurement_name -> f.dataType)
+ mMap += (measurement_name -> f.dataType)
}
})
@@ -76,15 +76,15 @@ object Transformer {
// | 1| root.ln.d1| 11| 12|null|
// +---------+-----------+---+---+----+
var res: org.apache.spark.sql.DataFrame = null
- map.keys.foreach { device_name =>
+ map.keys.foreach { deviceName =>
// build query
- var query = "select " + QueryConstant.RESERVED_TIME + ", \"" + device_name + "\" as device_name"
- val measurement_name = map(device_name)
- m_map.keySet.foreach { m =>
+ var query = "select " + QueryConstant.RESERVED_TIME + ", \"" + deviceName + "\" as device_name"
+ val measurement_name = map(deviceName)
+ mMap.keySet.foreach { m =>
val pos = measurement_name.indexOf(m)
if (pos >= 0) {
// select normal column
- query += ", `" + device_name + "." + m + "` as " + m
+ query += ", `" + deviceName + "." + m + "` as " + m
}
else {
// fill null column
@@ -93,13 +93,13 @@ object Transformer {
}
query += " from tsfle_old_form"
- var cur_df = spark.sql(query)
+ val curDF = spark.sql(query)
if (res == null) {
- res = cur_df
+ res = curDF
}
else {
- res = res.union(cur_df)
+ res = res.union(curDF)
}
}
@@ -127,18 +127,18 @@ object Transformer {
*/
def toOldForm(spark: SparkSession,
df: DataFrame): DataFrame = {
- df.registerTempTable("tsfle_new_form")
+ df.createOrReplaceTempView("tsfle_new_form")
// get all device_name
- val device_names = spark.sql("select distinct device_name from tsfle_new_form").collect()
- val table_df = spark.sql("select * from tsfle_new_form")
+ val deviceNames = spark.sql("select distinct device_name from tsfle_new_form").collect()
+ val tableDF= spark.sql("select * from tsfle_new_form")
import scala.collection.mutable.ListBuffer
// get all measurement_name
- val measurement_names = new ListBuffer[String]()
+ val measurementNames = new ListBuffer[String]()
- table_df.schema.foreach(f => {
+ tableDF.schema.foreach(f => {
if (!QueryConstant.RESERVED_TIME.equals(f.name) && !"device_name".equals(f.name)) {
- measurement_names += f.name
+ measurementNames += f.name
}
})
@@ -150,21 +150,21 @@ object Transformer {
// | 1| 11| 12| null| 21| 22| 23|
// +---------+-------------+-------------+-------------+-------------+-------------+-------------+
- device_names.foreach(device_name => {
+ deviceNames.foreach(deviceName => {
var query = "select " + QueryConstant.RESERVED_TIME
- measurement_names.foreach(measurement_name => {
- query = query + ", " + measurement_name + " as `" + device_name(0) + "." + measurement_name + "`"
+ measurementNames.foreach(measurementName => {
+ query = query + ", " + measurementName + " as `" + deviceName(0) + "." + measurementName + "`"
})
- query = query + " from tsfle_new_form where device_name = \"" + device_name(0) + "\""
- val cur_df = spark.sql(query)
+ query = query + " from tsfle_new_form where device_name = \"" + deviceName(0) + "\""
+ val curDF = spark.sql(query)
if (res == null) {
- res = cur_df
+ res = curDF
}
else {
- res = res.join(cur_df, List(QueryConstant.RESERVED_TIME), "outer")
+ res = res.join(curDF, List(QueryConstant.RESERVED_TIME), "outer")
}
})