You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2019/08/19 13:19:46 UTC

[griffin] branch master updated: [GRIFFIN-278] AvroBatchDataConnector handle input is directory

This is an automated email from the ASF dual-hosted git repository.

guoyp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 66df6cd  [GRIFFIN-278] AvroBatchDataConnector handle input is directory
66df6cd is described below

commit 66df6cd9ebe1e3384225ec08480f4ee4ab2f12e7
Author: Johnnie <jo...@gmail.com>
AuthorDate: Mon Aug 19 21:19:25 2019 +0800

    [GRIFFIN-278] AvroBatchDataConnector handle input is directory
    
    AvroBatchDataConnector process data based on file-level and need to handle the case when input is a directory.
    
    Author: Johnnie <jo...@gmail.com>
    
    Closes #521 from joohnnie/GRIFFIN-278.
---
 .../{config-batch.json => config-batch-path.json}        |   4 ++--
 measure/src/main/resources/config-batch.json             |   4 ++--
 measure/src/main/resources/env-batch.json                |   2 +-
 .../connector/batch/AvroBatchDataConnector.scala         |  10 +++++-----
 .../test/resources/users_info_src/users_info_src.avro    | Bin 0 -> 3850 bytes
 .../resources/users_info_target/users_info_target.avro   | Bin 0 -> 3852 bytes
 6 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch-path.json
similarity index 87%
copy from measure/src/main/resources/config-batch.json
copy to measure/src/main/resources/config-batch-path.json
index d2257a0..6aab127 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch-path.json
@@ -12,7 +12,7 @@
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
+            "file.path": "measure/src/test/resources/users_info_src"
           }
         }
       ]
@@ -23,7 +23,7 @@
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
+            "file.path": "measure/src/test/resources/users_info_target"
           }
         }
       ]
diff --git a/measure/src/main/resources/config-batch.json b/measure/src/main/resources/config-batch.json
index d2257a0..69ad485 100644
--- a/measure/src/main/resources/config-batch.json
+++ b/measure/src/main/resources/config-batch.json
@@ -12,7 +12,7 @@
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "src/test/resources/users_info_src.avro"
+            "file.name": "measure/src/test/resources/users_info_src.avro"
           }
         }
       ]
@@ -23,7 +23,7 @@
           "type": "avro",
           "version": "1.7",
           "config": {
-            "file.name": "src/test/resources/users_info_target.avro"
+            "file.name": "measure/src/test/resources/users_info_target.avro"
           }
         }
       ]
diff --git a/measure/src/main/resources/env-batch.json b/measure/src/main/resources/env-batch.json
index bed6ed8..f2a1639 100644
--- a/measure/src/main/resources/env-batch.json
+++ b/measure/src/main/resources/env-batch.json
@@ -25,7 +25,7 @@
       "type": "ELASTICSEARCH",
       "config": {
         "method": "post",
-        "api": "http://10.148.181.248:39200/griffin/accuracy",
+        "api": "http://localhost:9200/griffin/accuracy",
         "connection.timeout": "1m",
         "retry": 10
       }
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
index 457eaa9..4587cca 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -34,15 +34,15 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
                                   timestampStorage: TimestampStorage
                                  ) extends BatchDataConnector {
 
-  val config = dcParam.getConfig
+  val config: Map[String, Any] = dcParam.getConfig
 
   val FilePath = "file.path"
   val FileName = "file.name"
 
-  val filePath = config.getString(FilePath, "")
-  val fileName = config.getString(FileName, "")
+  val filePath: String = config.getString(FilePath, "")
+  val fileName: String = config.getString(FileName, "")
 
-  val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+  val concreteFileFullPath: String = if (pathPrefix()) filePath else fileName
 
   private def pathPrefix(): Boolean = {
     filePath.nonEmpty
@@ -53,7 +53,7 @@ case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
   }
 
   def data(ms: Long): (Option[DataFrame], TimeRange) = {
-    assert(fileExist(), s"Avro file ${concreteFileFullPath} is not exists!")
+    assert(fileExist(), s"Avro file $concreteFileFullPath is not exists!")
     val dfOpt = {
       val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
       val dfOpt = Some(df)
diff --git a/measure/src/test/resources/users_info_src/users_info_src.avro b/measure/src/test/resources/users_info_src/users_info_src.avro
new file mode 100644
index 0000000..3d5c939
Binary files /dev/null and b/measure/src/test/resources/users_info_src/users_info_src.avro differ
diff --git a/measure/src/test/resources/users_info_target/users_info_target.avro b/measure/src/test/resources/users_info_target/users_info_target.avro
new file mode 100644
index 0000000..104dd6c
Binary files /dev/null and b/measure/src/test/resources/users_info_target/users_info_target.avro differ