You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2018/04/26 08:12:47 UTC
[24/50] [abbrv] incubator-griffin git commit: pass streaming
pass streaming
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/365a85d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/365a85d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/365a85d1
Branch: refs/heads/griffin-0.2.0-incubating-rc4
Commit: 365a85d14028dcf0d5f2e77f3e152e8dca75a504
Parents: 6dd65d3
Author: Lionel Liu <bh...@163.com>
Authored: Tue Apr 17 15:44:49 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Tue Apr 17 15:44:49 2018 +0800
----------------------------------------------------------------------
.../griffin/measure/process/engine/DataFrameOprEngine.scala | 4 +---
.../scala/org/apache/griffin/measure/utils/HdfsUtil.scala | 2 +-
.../src/test/resources/_accuracy-streaming-griffindsl.json | 8 +++++---
.../test/resources/_completeness-streaming-griffindsl.json | 7 ++++---
.../src/test/resources/_profiling-streaming-griffindsl.json | 7 ++++---
measure/src/test/resources/env-streaming.json | 1 +
6 files changed, 16 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
index 600da45..c06406c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
@@ -120,11 +120,9 @@ object DataFrameOprs {
}
}
- implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.bean(classOf[AccuracyResult]))
-
val df = sqlContext.table(s"`${dfName}`")
- val results = df.flatMap { row =>
+ val results = df.rdd.flatMap { row =>
try {
val tmst = getLong(row, InternalColumns.tmst).getOrElse(timeInfo.calcTime)
val missCount = getLong(row, miss).getOrElse(0L)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
index aa5643b..0a91fab 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/utils/HdfsUtil.scala
@@ -28,7 +28,7 @@ object HdfsUtil extends Loggable {
private val conf = new Configuration()
conf.setBoolean("dfs.support.append", true)
-// conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost
+ conf.set("fs.defaultFS", "hdfs://localhost") // debug @localhost
private val dfs = FileSystem.get(conf)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_accuracy-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_accuracy-streaming-griffindsl.json b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
index a0e2e7d..240d768 100644
--- a/measure/src/test/resources/_accuracy-streaming-griffindsl.json
+++ b/measure/src/test/resources/_accuracy-streaming-griffindsl.json
@@ -13,7 +13,7 @@
"version": "0.8",
"config": {
"kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
+ "bootstrap.servers": "10.147.177.107:9092",
"group.id": "group1",
"auto.offset.reset": "smallest",
"auto.commit.enable": "false"
@@ -46,6 +46,7 @@
"ready.time.interval": "10s",
"ready.time.delay": "0",
"time.range": ["-2m", "0"],
+ "init.clear": true,
"updatable": true
}
}, {
@@ -56,7 +57,7 @@
"version": "0.8",
"config": {
"kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
+ "bootstrap.servers": "10.147.177.107:9092",
"group.id": "group1",
"auto.offset.reset": "smallest",
"auto.commit.enable": "false"
@@ -88,7 +89,8 @@
"info.path": "target",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["-2m", "0"]
+ "time.range": ["-2m", "0"],
+ "init.clear": true
}
}
],
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_completeness-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_completeness-streaming-griffindsl.json b/measure/src/test/resources/_completeness-streaming-griffindsl.json
index df1b889..ba8bdce 100644
--- a/measure/src/test/resources/_completeness-streaming-griffindsl.json
+++ b/measure/src/test/resources/_completeness-streaming-griffindsl.json
@@ -39,11 +39,12 @@
}
],
"cache": {
- "file.path": "hdfs://localhost/griffin/streaming/dump/old",
- "info.path": "old",
+ "file.path": "hdfs://localhost/griffin/streaming/dump/source",
+ "info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": ["0", "0"],
+ "init.clear": true
}
}
],
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/_profiling-streaming-griffindsl.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/_profiling-streaming-griffindsl.json b/measure/src/test/resources/_profiling-streaming-griffindsl.json
index e662897..b6feb5a 100644
--- a/measure/src/test/resources/_profiling-streaming-griffindsl.json
+++ b/measure/src/test/resources/_profiling-streaming-griffindsl.json
@@ -12,12 +12,12 @@
"version": "0.8",
"config": {
"kafka.config": {
- "bootstrap.servers": "10.149.247.156:9092",
+ "bootstrap.servers": "10.147.177.107:9092",
"group.id": "group1",
"auto.offset.reset": "smallest",
"auto.commit.enable": "false"
},
- "topics": "sss",
+ "topics": "test",
"key.type": "java.lang.String",
"value.type": "java.lang.String"
},
@@ -43,7 +43,8 @@
"info.path": "source",
"ready.time.interval": "10s",
"ready.time.delay": "0",
- "time.range": ["0", "0"]
+ "time.range": ["0", "0"],
+ "init.clear": true
}
}
],
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/365a85d1/measure/src/test/resources/env-streaming.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-streaming.json b/measure/src/test/resources/env-streaming.json
index a01348f..08dd7ee 100644
--- a/measure/src/test/resources/env-streaming.json
+++ b/measure/src/test/resources/env-streaming.json
@@ -4,6 +4,7 @@
"checkpoint.dir": "hdfs://localhost/test/griffin/cp",
"batch.interval": "2s",
"process.interval": "10s",
+ "init.clear": true,
"config": {
"spark.master": "local[*]",
"spark.task.maxFailures": 5,