You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/08/19 13:19:46 UTC
[griffin] branch master updated: [GRIFFIN-278]
AvroBatchDataConnector handle input is directory
This is an automated email from the ASF dual-hosted git repository.
guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git
The following commit(s) were added to refs/heads/master by this push:
new 66df6cd [GRIFFIN-278] AvroBatchDataConnector handle input is directory
66df6cd is described below
commit 66df6cd9ebe1e3384225ec08480f4ee4ab2f12e7
Author: Johnnie <jo...@gmail.com>
AuthorDate: Mon Aug 19 21:19:25 2019 +0800
[GRIFFIN-278] AvroBatchDataConnector handle input is directory
AvroBatchDataConnector process data based on file-level and need to handle the case when input is a directory.
Author: Johnnie <jo...@gmail.com>
Closes #521 from joohnnie/GRIFFIN-278.
---
.../{config-batch.json => config-batch-path.json} | 4 ++--
measure/src/main/resources/config-batch.json | 4 ++--
measure/src/main/resources/env-batch.json | 2 +-
.../connector/batch/AvroBatchDataConnector.scala | 10 +++++-----
.../test/resources/users_info_src/users_info_src.avro | Bin 0 -> 3850 bytes
.../resources/users_info_target/users_info_target.avro | Bin 0 -> 3852 bytes
6 files changed, 10 insertions(+), 10 deletions(-)
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch-path.json
similarity index 87%
copy from measure/src/main/resources/config-batch.json
copy to measure/src/main/resources/config-batch-path.json
index d2257a0..6aab127 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch-path.json
@@ -12,7 +12,7 @@
"type": "avro",
"version": "1.7",
"config": {
- "file.name": "src/test/resources/users_info_src.avro"
+ "file.path": "measure/src/test/resources/users_info_src"
}
}
]
@@ -23,7 +23,7 @@
"type": "avro",
"version": "1.7",
"config": {
- "file.name": "src/test/resources/users_info_target.avro"
+ "file.path": "measure/src/test/resources/users_info_target"
}
}
]
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json
index d2257a0..69ad485 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -12,7 +12,7 @@
"type": "avro",
"version": "1.7",
"config": {
- "file.name": "src/test/resources/users_info_src.avro"
+ "file.name": "measure/src/test/resources/users_info_src.avro"
}
}
]
@@ -23,7 +23,7 @@
"type": "avro",
"version": "1.7",
"config": {
- "file.name": "src/test/resources/users_info_target.avro"
+ "file.name": "measure/src/test/resources/users_info_target.avro"
}
}
]
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index bed6ed8..f2a1639 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -25,7 +25,7 @@
"type": "ELASTICSEARCH",
"config": {
"method": "post",
- "api": "http://10.148.181.248:39200/griffin/accuracy",
+ "api": "http://localhost:9200/griffin/accuracy",
"connection.timeout": "1m",
"retry": 10
}
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 457eaa9..4587cca 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -34,15 +34,15 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
timestampStorage: TimestampStorage
) extends BatchDataConnector {
- val config = dcParam.getConfig
+ val config: Map[String, Any] = dcParam.getConfig
val FilePath = "file.path"
val FileName = "file.name"
- val filePath = config.getString(FilePath, "")
- val fileName = config.getString(FileName, "")
+ val filePath: String = config.getString(FilePath, "")
+ val fileName: String = config.getString(FileName, "")
- val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+ val concreteFileFullPath: String = if (pathPrefix()) filePath else fileName
private def pathPrefix(): Boolean = {
filePath.nonEmpty
@@ -53,7 +53,7 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
}
def data(ms: Long): (Option[DataFrame], TimeRange) = {
- assert(fileExist(), s"Avro file ${concreteFileFullPath} is not exists!")
+ assert(fileExist(), s"Avro file $concreteFileFullPath is not exists!")
val dfOpt = {
val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
val dfOpt = Some(df)
diff --git a/measure/src/test/resources/users_info_src/users_info_src.avro b/measure/src/test/resources/users_info_src/users_info_src.avro
new file mode 100644
index 0000000..3d5c939
Binary files /dev/null and b/measure/src/test/resources/users_info_src/users_info_src.avro differ
diff --git a/measure/src/test/resources/users_info_target/users_info_target.avro b/measure/src/test/resources/users_info_target/users_info_target.avro
new file mode 100644
index 0000000..104dd6c
Binary files /dev/null and b/measure/src/test/resources/users_info_target/users_info_target.avro differ