You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2018/04/26 08:12:47 UTC

[24/50] [abbrv] incubator-griffin git commit: pass streaming

pass streaming


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/365a85d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/365a85d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/365a85d1

Branch: refs/heads/griffin-0.2.0-incubating-rc4
Commit: 365a85d14028dcf0d5f2e77f3e152e8dca75a504
Parents: 6dd65d3
Author: Lionel Liu <bh...@163.com>
Authored: Tue Apr 17 15:44:49 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Apr 17 15:44:49 2018 +0800

----------------------------------------------------------------------
 .../griffin/measure/process/engine/DataFrameOprEngine.scala  | 4 +---
 .../scala/org/apache/griffin/measure/utils/HdfsUtil.scala    | 2 +-
 .../src/test/resources/_accuracy-streaming-griffindsl.json   | 8 +++++---
 .../test/resources/_completeness-streaming-griffindsl.json   | 7 ++++---
 .../src/test/resources/_profiling-streaming-griffindsl.json  | 7 ++++---
 measure/src/test/resources/env-streaming.json                | 1 +
 6 files changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
index 600da45..c06406c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
@@ -120,11 +120,9 @@ object DataFrameOprs {
       }
     }
 
-    implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.bean(classOf[AccuracyResult]))
-
     val df = sqlContext.table(s"`${dfName}`")
 
-    val results = df.flatMap { row =>
+    val results = df.rdd.flatMap { row =>
       try {
         val tmst = getLong(row, InternalColumns.tmst).getOrElse(timeInfo.calcTime)
         val missCount = getLong(row, miss).getOrElse(0L)

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index aa5643b..0a91fab 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -28,7 +28,7 @@ object HdfsUtil extends Loggable {
 
   private val conf = new Configuration()
   conf.setBoolean("dfs.support.append", true)
-//  conf.set("fs.defaultFS", "hdfs://localhost")    // debug @localhost
+  conf.set("fs.defaultFS", "hdfs://localhost")    // debug @localhost
 
   private val dfs = FileSystem.get(conf)
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index a0e2e7d..240d768 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -13,7 +13,7 @@
           "version": "0.8",
           "config": {
             "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
+              "bootstrap.servers": "10.147.177.107:9092",
               "group.id": "group1",
               "auto.offset.reset": "smallest",
               "auto.commit.enable": "false"
@@ -46,6 +46,7 @@
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
         "time.range": ["-2m", "0"],
+        "init.clear": true,
         "updatable": true
       }
     }, {
@@ -56,7 +57,7 @@
           "version": "0.8",
           "config": {
             "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
+              "bootstrap.servers": "10.147.177.107:9092",
               "group.id": "group1",
               "auto.offset.reset": "smallest",
               "auto.commit.enable": "false"
@@ -88,7 +89,8 @@
         "info.path": "target",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["-2m", "0"]
+        "time.range": ["-2m", "0"],
+        "init.clear": true
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index df1b889..ba8bdce 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -39,11 +39,12 @@
         }
       ],
       "cache": {
-        "file.path": "hdfs://localhost/griffin/streaming/dump/old",
-        "info.path": "old",
+        "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+        "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": ["0", "0"],
+        "init.clear": true
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_profiling-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index e662897..b6feb5a 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -12,12 +12,12 @@
           "version": "0.8",
           "config": {
             "kafka.config": {
-              "bootstrap.servers": "10.149.247.156:9092",
+              "bootstrap.servers": "10.147.177.107:9092",
               "group.id": "group1",
               "auto.offset.reset": "smallest",
               "auto.commit.enable": "false"
             },
-            "topics": "sss",
+            "topics": "test",
             "key.type": "java.lang.String",
             "value.type": "java.lang.String"
           },
@@ -43,7 +43,8 @@
         "info.path": "source",
         "ready.time.interval": "10s",
         "ready.time.delay": "0",
-        "time.range": ["0", "0"]
+        "time.range": ["0", "0"],
+        "init.clear": true
       }
     }
   ],

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json
index a01348f..08dd7ee 100644
--- a/measure/src/test/resources/env-streaming.json
+++ b/measure/src/test/resources/env-streaming.json
@@ -4,6 +4,7 @@
     "checkpoint.dir": "hdfs://localhost/test/griffin/cp",
     "batch.interval": "2s",
     "process.interval": "10s",
+    "init.clear": true,
     "config": {
       "spark.master": "local[*]",
       "spark.task.maxFailures": 5,