You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:17 UTC
[02/11] incubator-griffin git commit: Dsl modify
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data.jsonFile b/measure/src/test/resources/test-data.jsonFile
new file mode 100644
index 0000000..73707f4
--- /dev/null
+++ b/measure/src/test/resources/test-data.jsonFile
@@ -0,0 +1,3 @@
+{ "name": "emily", "age": 5, "map": { "a": 1, "b": 2 }, "list": [ { "c": 1, "d": 2 }, { "c": 3, "d": 4 } ], "t": [1, 2, 3] }
+{ "name": "white", "age": 15, "map": { "a": 11, "b": 12 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
+{ "name": "west", "age": 25, "map": { "a": 21, "b": 22 }, "list": [ { "c": 11, "d": 2 }, { "c": 23, "d": 4 } ], "t": [1, 2, 3] }
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data0.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data0.json b/measure/src/test/resources/test-data0.json
new file mode 100644
index 0000000..406acb8
--- /dev/null
+++ b/measure/src/test/resources/test-data0.json
@@ -0,0 +1,56 @@
+[
+ {
+ "name": "emily",
+ "age": 5,
+ "map": {
+ "a": 1,
+ "b": 2
+ },
+ "list": [
+ {
+ "c": 1,
+ "d": 2
+ },
+ {
+ "c": 3,
+ "d": 4
+ }
+ ]
+ },
+ {
+ "name": "white",
+ "age": 15,
+ "map": {
+ "a": 11,
+ "b": 12
+ },
+ "list": [
+ {
+ "c": 11,
+ "d": 2
+ },
+ {
+ "c": 23,
+ "d": 4
+ }
+ ]
+ },
+ {
+ "name": "west",
+ "age": 25,
+ "map": {
+ "a": 21,
+ "b": 22
+ },
+ "list": [
+ {
+ "c": 11,
+ "d": 2
+ },
+ {
+ "c": 23,
+ "d": 4
+ }
+ ]
+ }
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/resources/test-data1.jsonFile
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/test-data1.jsonFile b/measure/src/test/resources/test-data1.jsonFile
new file mode 100644
index 0000000..1e1f28a
--- /dev/null
+++ b/measure/src/test/resources/test-data1.jsonFile
@@ -0,0 +1,31 @@
+[{
+ "Year": "2013",
+ "First Name": "DAVID",
+ "County": "KINGS",
+ "Sex": "M",
+ "Count": "272"
+}, {
+ "Year": "2013",
+ "First Name": "JAYDEN",
+ "County": "KINGS",
+ "Sex": "M",
+ "Count": "268"
+}, {
+ "Year": "2013",
+ "First Name": "JAYDEN",
+ "County": "QUEENS",
+ "Sex": "M",
+ "Count": "219"
+}, {
+ "Year": "2013",
+ "First Name": "MOSHE",
+ "County": "KINGS",
+ "Sex": "M",
+ "Count": "219"
+}, {
+ "Year": "2013",
+ "First Name": "ETHAN",
+ "County": "QUEENS",
+ "Sex": "M",
+ "Count": "216"
+}]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
deleted file mode 100644
index 6a60326..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchAccuracyAlgoTest.scala
+++ /dev/null
@@ -1,198 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-// val envFile = "src/test/resources/env.json"
-// val confFile = "src/test/resources/config.json"
-//// val confFile = "{\"name\":\"accu1\",\"type\":\"accuracy\",\"source\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_src.avro\"}},\"target\":{\"type\":\"avro\",\"version\":\"1.7\",\"config\":{\"file.name\":\"src/test/resources/users_info_target.avro\"}},\"evaluateRule\":{\"sampleRatio\":1,\"rules\":\"$source.user_id + 5 = $target.user_id + (2 + 3) AND $source.first_name + 12 = $target.first_name + (10 + 2) AND $source.last_name = $target.last_name AND $source.address = $target.address AND $source.email = $target.email AND $source.phone = $target.phone AND $source.post_code = $target.post_code AND (15 OR true) WHEN true AND $source.user_id > 10020\"}}"
-// val envFsType = "local"
-// val userFsType = "local"
-//
-// val args = Array(envFile, confFile)
-//
-// var sc: SparkContext = _
-// var sqlContext: SQLContext = _
-//
-// var allParam: AllParam = _
-//
-// before {
-// // read param files
-// val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// val userParam = readParamFile[UserParam](confFile, userFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// allParam = AllParam(envParam, userParam)
-//
-// // validate param files
-// validateParams(allParam) match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-3)
-// }
-// case _ => {
-// info("params validation pass")
-// }
-// }
-//
-// val metricName = userParam.name
-// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-// sc = new SparkContext(conf)
-// sqlContext = new SQLContext(sc)
-// }
-//
-// test("algorithm") {
-// Try {
-// val envParam = allParam.envParam
-// val userParam = allParam.userParam
-//
-// // start time
-// val startTime = new Date().getTime()
-//
-// // get spark application id
-// val applicationId = sc.applicationId
-//
-// // rules
-// val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-// val rule: StatementExpr = ruleFactory.generateRule()
-// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-// ruleAnalyzer.constCacheExprs.foreach(println)
-// ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-// // global cache data
-// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-// val finalConstMap = finalConstExprValueMap.headOption match {
-// case Some(m) => m
-// case _ => Map[String, Any]()
-// }
-//
-// // data connector
-// val sourceDataConnector: DirectDataConnector =
-// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-// ruleAnalyzer.sourceRuleExprs, finalConstMap
-// ) match {
-// case Success(cntr) => {
-// if (cntr.available) cntr
-// else throw new Exception("source data not available!")
-// }
-// case Failure(ex) => throw ex
-// }
-// val targetDataConnector: DirectDataConnector =
-// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.targetParam,
-// ruleAnalyzer.targetRuleExprs, finalConstMap
-// ) match {
-// case Success(cntr) => {
-// if (cntr.available) cntr
-// else throw new Exception("target data not available!")
-// }
-// case Failure(ex) => throw ex
-// }
-//
-// // get metadata
-//// val sourceMetaData: Iterable[(String, String)] = sourceDataConnector.metaData() match {
-//// case Success(md) => md
-//// case Failure(ex) => throw ex
-//// }
-//// val targetMetaData: Iterable[(String, String)] = targetDataConnector.metaData() match {
-//// case Success(md) => md
-//// case Failure(ex) => throw ex
-//// }
-//
-// // get data
-// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-// case Success(dt) => dt
-// case Failure(ex) => throw ex
-// }
-// val targetData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = targetDataConnector.data() match {
-// case Success(dt) => dt
-// case Failure(ex) => throw ex
-// }
-//
-// // my algo
-// val algo = BatchAccuracyAlgo(allParam)
-//
-// // accuracy algorithm
-// val (accuResult, missingRdd, matchedRdd) = algo.accuracy(sourceData, targetData, ruleAnalyzer)
-//
-// println(s"match percentage: ${accuResult.matchPercentage}, total count: ${accuResult.total}")
-//
-// missingRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs, ruleAnalyzer.targetRuleExprs.persistExprs)).foreach(println)
-//
-// // end time
-// val endTime = new Date().getTime
-// println(s"using time: ${endTime - startTime} ms")
-// } match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-4)
-// }
-// case _ => {
-// info("calculation finished")
-// }
-// }
-// }
-//
-// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-// val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-// paramReader.readConfig[T]
-// }
-//
-// private def validateParams(allParam: AllParam): Try[Boolean] = {
-// val allParamValidator = AllParamValidator()
-// allParamValidator.validate(allParam)
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
deleted file mode 100644
index e0f500a..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/BatchProfileAlgoTest.scala
+++ /dev/null
@@ -1,173 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.algo.batch.BatchProfileAlgo
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class BatchProfileAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-// val envFile = "src/test/resources/env.json"
-// val confFile = "src/test/resources/config-profile.json"
-// val envFsType = "local"
-// val userFsType = "local"
-//
-// val args = Array(envFile, confFile)
-//
-// var sc: SparkContext = _
-// var sqlContext: SQLContext = _
-//
-// var allParam: AllParam = _
-//
-// before {
-// // read param files
-// val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// val userParam = readParamFile[UserParam](confFile, userFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// allParam = AllParam(envParam, userParam)
-//
-// // validate param files
-// validateParams(allParam) match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-3)
-// }
-// case _ => {
-// info("params validation pass")
-// }
-// }
-//
-// val metricName = userParam.name
-// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-// sc = new SparkContext(conf)
-// sqlContext = new SQLContext(sc)
-// }
-//
-// test("algorithm") {
-// Try {
-// val envParam = allParam.envParam
-// val userParam = allParam.userParam
-//
-// // start time
-// val startTime = new Date().getTime()
-//
-// // get spark application id
-// val applicationId = sc.applicationId
-//
-// // rules
-// val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-// val rule: StatementExpr = ruleFactory.generateRule()
-// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-// ruleAnalyzer.constCacheExprs.foreach(println)
-// ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-// // global cache data
-// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-// val finalConstMap = finalConstExprValueMap.headOption match {
-// case Some(m) => m
-// case _ => Map[String, Any]()
-// }
-//
-// // data connector
-// val sourceDataConnector: DirectDataConnector =
-// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-// ruleAnalyzer.sourceRuleExprs, finalConstMap
-// ) match {
-// case Success(cntr) => {
-// if (cntr.available) cntr
-// else throw new Exception("source data not available!")
-// }
-// case Failure(ex) => throw ex
-// }
-//
-// // get data
-// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-// case Success(dt) => dt
-// case Failure(ex) => throw ex
-// }
-//
-// // my algo
-// val algo = BatchProfileAlgo(allParam)
-//
-// // profile algorithm
-// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
-//
-// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
-//
-// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
-//
-// // end time
-// val endTime = new Date().getTime
-// println(s"using time: ${endTime - startTime} ms")
-// } match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-4)
-// }
-// case _ => {
-// info("calculation finished")
-// }
-// }
-// }
-//
-// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-// val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-// paramReader.readConfig[T]
-// }
-//
-// private def validateParams(allParam: AllParam): Try[Boolean] = {
-// val allParamValidator = AllParamValidator()
-// allParamValidator.validate(allParam)
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
deleted file mode 100644
index a76712f..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/batch/DataFrameSaveTest.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo.batch
-//
-//import java.util.Date
-//
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.DataConnectorFactory
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class DataFrameSaveTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-// val envFile = "src/test/resources/env.json"
-// val confFile = "src/test/resources/config-profile.json"
-// val envFsType = "local"
-// val userFsType = "local"
-//
-// val args = Array(envFile, confFile)
-//
-// var sc: SparkContext = _
-// var sqlContext: SQLContext = _
-//
-// var allParam: AllParam = _
-//
-// before {
-// // read param files
-// val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// val userParam = readParamFile[UserParam](confFile, userFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// allParam = AllParam(envParam, userParam)
-//
-// // validate param files
-// validateParams(allParam) match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-3)
-// }
-// case _ => {
-// info("params validation pass")
-// }
-// }
-//
-// val metricName = userParam.name
-// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-// sc = new SparkContext(conf)
-// sqlContext = new SQLContext(sc)
-// }
-//
-// test("algorithm") {
-// Try {
-// val envParam = allParam.envParam
-// val userParam = allParam.userParam
-//
-// // start time
-// val startTime = new Date().getTime()
-//
-// // get spark application id
-// val applicationId = sc.applicationId
-//
-// // rules
-// val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-// val rule: StatementExpr = ruleFactory.generateRule()
-// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-// ruleAnalyzer.constCacheExprs.foreach(println)
-// ruleAnalyzer.constFinalCacheExprs.foreach(println)
-//
-// // global cache data
-// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-// val finalConstMap = finalConstExprValueMap.headOption match {
-// case Some(m) => m
-// case _ => Map[String, Any]()
-// }
-//
-// // data connector
-// val sourceDataConnector: DirectDataConnector =
-// DataConnectorFactory.getDirectDataConnector(sqlContext, null, userParam.sourceParam,
-// ruleAnalyzer.sourceRuleExprs, finalConstMap
-// ) match {
-// case Success(cntr) => {
-// if (cntr.available) cntr
-// else throw new Exception("source data not available!")
-// }
-// case Failure(ex) => throw ex
-// }
-//
-// // get data
-// val sourceData: RDD[(Product, (Map[String, Any], Map[String, Any]))] = sourceDataConnector.data() match {
-// case Success(dt) => dt
-// case Failure(ex) => throw ex
-// }
-//
-// // my algo
-// val algo = BatchProfileAlgo(allParam)
-//
-// // profile algorithm
-// val (profileResult, missingRdd, matchedRdd) = algo.profile(sourceData, ruleAnalyzer)
-//
-// println(s"match percentage: ${profileResult.matchPercentage}, match count: ${profileResult.matchCount}, total count: ${profileResult.totalCount}")
-//
-// matchedRdd.map(rec => algo.record2String(rec, ruleAnalyzer.sourceRuleExprs.persistExprs)).foreach(println)
-//
-// // end time
-// val endTime = new Date().getTime
-// println(s"using time: ${endTime - startTime} ms")
-// } match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-4)
-// }
-// case _ => {
-// info("calculation finished")
-// }
-// }
-// }
-//
-// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-// val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-// paramReader.readConfig[T]
-// }
-//
-// private def validateParams(allParam: AllParam): Try[Boolean] = {
-// val allParamValidator = AllParamValidator()
-// allParamValidator.validate(allParam)
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
deleted file mode 100644
index 2179fba..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/core/AccuracyCoreTest.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class AccuracyCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
-
- def findExprId(exprs: Iterable[Expr], desc: String): String = {
- exprs.find(_.desc == desc) match {
- case Some(expr) => expr._id
- case _ => ""
- }
- }
-
- test ("match data success") {
- val rule = "$source.name = $target.name AND $source.age < $target.age"
- val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
- val ruleFactory = RuleFactory(evaluateRuleParam)
- val statement = ruleFactory.generateRule
- val ruleAnalyzer = RuleAnalyzer(statement)
-
- val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
- val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
- val source = (Map[String, Any](
- (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
- (findExprId(sourcePersistExprs, "$source['age']") -> 26)
- ), Map[String, Any]())
- val target = (Map[String, Any](
- (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
- (findExprId(targetPersistExprs, "$target['age']") -> 27)
- ), Map[String, Any]())
-
- val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
- val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
- result._1 should be (true)
- result._2.size should be (0)
- }
-
- test ("match data fail") {
- val rule = "$source.name = $target.name AND $source.age = $target.age"
- val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
- val ruleFactory = RuleFactory(evaluateRuleParam)
- val statement = ruleFactory.generateRule
- val ruleAnalyzer = RuleAnalyzer(statement)
-
- val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
- val targetPersistExprs = ruleAnalyzer.targetRuleExprs.persistExprs
-
- val source = (Map[String, Any](
- (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
- (findExprId(sourcePersistExprs, "$source['age']") -> 26)
- ), Map[String, Any]())
- val target = (Map[String, Any](
- (findExprId(targetPersistExprs, "$target['name']") -> "jack"),
- (findExprId(targetPersistExprs, "$target['age']") -> 27)
- ), Map[String, Any]())
-
- val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
- val result = AccuracyCore invokePrivate matchData(source, target, ruleAnalyzer)
- result._1 should be (false)
- result._2.size shouldNot be (0)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
deleted file mode 100644
index 087e8e5..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/core/ProfileCoreTest.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.algo.core
-
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.rule.expr._
-import org.apache.griffin.measure.rule.{RuleAnalyzer, RuleFactory}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-import org.scalatest.PrivateMethodTester
-
-@RunWith(classOf[JUnitRunner])
-class ProfileCoreTest extends FunSuite with Matchers with BeforeAndAfter with PrivateMethodTester {
-
- def findExprId(exprs: Iterable[Expr], desc: String): String = {
- exprs.find(_.desc == desc) match {
- case Some(expr) => expr._id
- case _ => ""
- }
- }
-
- test ("match data success") {
- val rule = "$source.name = 'jack' AND $source.age = null"
- val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
- val ruleFactory = RuleFactory(evaluateRuleParam)
- val statement = ruleFactory.generateRule
- val ruleAnalyzer = RuleAnalyzer(statement)
-
- val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-
- val source = (Map[String, Any](
- (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
- (findExprId(sourcePersistExprs, "$source['age']") -> null)
- ), Map[String, Any]())
-
- val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
- val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
- result._1 should be (true)
- result._2.size should be (0)
- }
-
- test ("match data fail") {
- val rule = "$source.name = 'jack' AND $source.age != null"
- val evaluateRuleParam = EvaluateRuleParam(1.0, rule)
- val ruleFactory = RuleFactory(evaluateRuleParam)
- val statement = ruleFactory.generateRule
- val ruleAnalyzer = RuleAnalyzer(statement)
-
- val sourcePersistExprs = ruleAnalyzer.sourceRuleExprs.persistExprs
-
- val source = (Map[String, Any](
- (findExprId(sourcePersistExprs, "$source['name']") -> "jack"),
- (findExprId(sourcePersistExprs, "$source['age']") -> null)
- ), Map[String, Any]())
-
- val matchData = PrivateMethod[(Boolean, Map[String, Any])]('matchData)
- val result = ProfileCore invokePrivate matchData(source, ruleAnalyzer)
- result._1 should be (false)
- result._2.size shouldNot be (0)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala b/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
deleted file mode 100644
index a22f91f..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/algo/streaming/StreamingAccuracyAlgoTest.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-///*
-//Licensed to the Apache Software Foundation (ASF) under one
-//or more contributor license agreements. See the NOTICE file
-//distributed with this work for additional information
-//regarding copyright ownership. The ASF licenses this file
-//to you under the Apache License, Version 2.0 (the
-//"License"); you may not use this file except in compliance
-//with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-//Unless required by applicable law or agreed to in writing,
-//software distributed under the License is distributed on an
-//"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-//KIND, either express or implied. See the License for the
-//specific language governing permissions and limitations
-//under the License.
-//*/
-//package org.apache.griffin.measure.algo.streaming
-//
-//import java.util.Date
-//import java.util.concurrent.TimeUnit
-//
-//import org.apache.griffin.measure.algo.batch.BatchAccuracyAlgo
-//import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-//import org.apache.griffin.measure.cache.result._
-//import org.apache.griffin.measure.config.params._
-//import org.apache.griffin.measure.config.params.env._
-//import org.apache.griffin.measure.config.params.user._
-//import org.apache.griffin.measure.config.reader._
-//import org.apache.griffin.measure.config.validator._
-//import org.apache.griffin.measure.connector.direct.DirectDataConnector
-//import org.apache.griffin.measure.connector.{DataConnector, DataConnectorFactory}
-//import org.apache.griffin.measure.log.Loggable
-//import org.apache.griffin.measure.persist.{Persist, PersistFactory, PersistType}
-//import org.apache.griffin.measure.result._
-//import org.apache.griffin.measure.rule.expr._
-//import org.apache.griffin.measure.rule.{ExprValueUtil, RuleAnalyzer, RuleFactory}
-//import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-//import org.apache.spark.rdd.RDD
-//import org.apache.spark.sql.SQLContext
-//import org.apache.spark.sql.hive.HiveContext
-//import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-//import org.apache.spark.{SparkConf, SparkContext}
-//import org.junit.runner.RunWith
-//import org.scalatest.junit.JUnitRunner
-//import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-//
-//import scala.util.{Failure, Success, Try}
-//
-//
-//@RunWith(classOf[JUnitRunner])
-//class StreamingAccuracyAlgoTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
-//
-// val envFile = "src/test/resources/env-streaming.json"
-// val confFile = "src/test/resources/config-streaming3.json"
-// val envFsType = "local"
-// val userFsType = "local"
-//
-// val args = Array(envFile, confFile)
-//
-// var sc: SparkContext = _
-// var sqlContext: SQLContext = _
-//// val ssc: StreamingContext = _
-//
-// var allParam: AllParam = _
-//
-// before {
-// // read param files
-// val envParam = readParamFile[EnvParam](envFile, envFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// val userParam = readParamFile[UserParam](confFile, userFsType) match {
-// case Success(p) => p
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-2)
-// }
-// }
-// allParam = AllParam(envParam, userParam)
-//
-// // validate param files
-// validateParams(allParam) match {
-// case Failure(ex) => {
-// error(ex.getMessage)
-// sys.exit(-3)
-// }
-// case _ => {
-// info("params validation pass")
-// }
-// }
-//
-// val metricName = userParam.name
-// val sparkParam = envParam.sparkParam
-// val conf = new SparkConf().setMaster("local[*]").setAppName(metricName)
-// conf.setAll(sparkParam.config)
-// sc = new SparkContext(conf)
-// sc.setLogLevel(envParam.sparkParam.logLevel)
-// sqlContext = new SQLContext(sc)
-//// sqlContext = new HiveContext(sc)
-//
-//// val a = sqlContext.sql("select * from s1 limit 10")
-//// // val a = sqlContext.sql("show tables")
-//// a.show(10)
-////
-//// val b = HdfsUtil.existPath("/griffin/streaming")
-//// println(b)
-// }
-//
-// test("algorithm") {
-// val envParam = allParam.envParam
-// val userParam = allParam.userParam
-// val metricName = userParam.name
-// val sparkParam = envParam.sparkParam
-// val cleanerParam = envParam.cleanerParam
-//
-//// val ssc = StreamingContext.getOrCreate(sparkParam.cpDir,
-//// ( ) => {
-//// try {
-//// val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
-//// case Some(interval) => Milliseconds(interval)
-//// case _ => throw new Exception("invalid batch interval")
-//// }
-//// val ssc = new StreamingContext(sc, batchInterval)
-//// ssc.checkpoint(sparkParam.cpDir)
-//// ssc
-//// } catch {
-//// case runtime: RuntimeException => {
-//// throw runtime
-//// }
-//// }
-//// })
-//
-// val batchInterval = TimeUtil.milliseconds(sparkParam.batchInterval) match {
-// case Some(interval) => Milliseconds(interval)
-// case _ => throw new Exception("invalid batch interval")
-// }
-// val ssc = new StreamingContext(sc, batchInterval)
-// ssc.checkpoint(sparkParam.cpDir)
-//
-// // start time
-// val startTime = new Date().getTime()
-//
-// val persistFactory = PersistFactory(envParam.persistParams, metricName)
-//
-// // get persists to persist measure result
-// val appPersist: Persist = persistFactory.getPersists(startTime)
-//
-// // get spark application id
-// val applicationId = sc.applicationId
-//
-// // persist start id
-// appPersist.start(applicationId)
-//
-// InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
-// InfoCacheInstance.init
-//
-// // generate rule from rule param, generate rule analyzer
-// val ruleFactory = RuleFactory(userParam.evaluateRuleParam)
-// val rule: StatementExpr = ruleFactory.generateRule()
-// val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-//
-// // const expr value map
-// val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
-// val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
-// val finalConstMap = finalConstExprValueMap.headOption match {
-// case Some(m) => m
-// case _ => Map[String, Any]()
-// }
-//
-// // data connector
-// val sourceDataConnector: DirectDataConnector =
-// DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.sourceParam,
-// ruleAnalyzer.sourceRuleExprs, finalConstMap
-// ) match {
-// case Success(cntr) => {
-// if (cntr.available) cntr
-// else throw new Exception("source data connection error!")
-// }
-// case Failure(ex) => throw ex
-// }
-// val targetDataConnector: DirectDataConnector =
-// DataConnectorFactory.getDirectDataConnector(sqlContext, ssc, userParam.targetParam,
-// ruleAnalyzer.targetRuleExprs, finalConstMap
-// ) match {
-// case Success(cntr) => {
-// if (cntr.available) cntr
-// else throw new Exception("target data connection error!")
-// }
-// case Failure(ex) => throw ex
-// }
-//
-// val cacheResultProcesser = CacheResultProcesser()
-//
-// // init data stream
-// sourceDataConnector.init()
-// targetDataConnector.init()
-//
-// // my algo
-// val algo = StreamingAccuracyAlgo(allParam)
-//
-// val streamingAccuracyProcess = StreamingAccuracyProcess(
-// sourceDataConnector, targetDataConnector,
-// ruleAnalyzer, cacheResultProcesser, persistFactory, appPersist)
-//
-// val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
-// case Some(interval) => interval
-// case _ => throw new Exception("invalid batch interval")
-// }
-// val process = TimingProcess(processInterval, streamingAccuracyProcess)
-//
-// // clean thread
-//// case class Clean() extends Runnable {
-//// val lock = InfoCacheInstance.genLock("clean")
-//// def run(): Unit = {
-//// val locked = lock.lock(5, TimeUnit.SECONDS)
-//// if (locked) {
-//// try {
-//// sourceDataConnector.cleanData
-//// targetDataConnector.cleanData
-//// } finally {
-//// lock.unlock()
-//// }
-//// }
-//// }
-//// }
-//// val cleanInterval = TimeUtil.milliseconds(cleanerParam.cleanInterval) match {
-//// case Some(interval) => interval
-//// case _ => throw new Exception("invalid batch interval")
-//// }
-//// val clean = TimingProcess(cleanInterval, Clean())
-//
-// process.startup()
-//// clean.startup()
-//
-// ssc.start()
-// ssc.awaitTermination()
-// ssc.stop(stopSparkContext=true, stopGracefully=true)
-//
-// println("================ end ================")
-//
-// // context stop
-// sc.stop
-//
-// InfoCacheInstance.close
-//
-// appPersist.finish()
-//
-// process.shutdown()
-//// clean.shutdown()
-// }
-//
-// private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
-// val paramReader = ParamReaderFactory.getParamReader(file, fsType)
-// paramReader.readConfig[T]
-// }
-//
-// private def validateParams(allParam: AllParam): Try[Boolean] = {
-// val allParamValidator = AllParamValidator()
-// allParamValidator.validate(allParam)
-// }
-//
-//}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
index b3c94e5..9e5d380 100644
--- a/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
+++ b/measure/src/test/scala/org/apache/griffin/measure/config/reader/ParamRawStringReaderTest.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
@RunWith(classOf[JUnitRunner])
class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfter {
- test("read config") {
+ test("read raw config") {
val rawString = """{"type": "hdfs", "config": {"path": "/path/to", "time": 1234567}}"""
val reader = ParamRawStringReader(rawString)
@@ -34,5 +34,4 @@ class ParamRawStringReaderTest extends FunSuite with Matchers with BeforeAndAfte
paramTry.isSuccess should be (true)
paramTry.get should be (PersistParam("hdfs", Map[String, Any](("path" -> "/path/to"), ("time" -> 1234567))))
}
-
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
deleted file mode 100644
index 2139ff7..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/connector/ConnectorTest.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.algo.streaming.TimingProcess
-import org.apache.griffin.measure.cache.info.InfoCacheInstance
-import org.apache.griffin.measure.config.params.env._
-import org.apache.griffin.measure.config.params.user.{DataCacheParam, DataConnectorParam, EvaluateRuleParam}
-import org.apache.griffin.measure.config.reader.ParamRawStringReader
-import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr}
-import org.apache.griffin.measure.rule._
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
-import org.apache.griffin.measure.rule.{DataTypeCalculationUtil, ExprValueUtil, RuleExprs}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DataType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-import org.apache.spark.streaming.{Milliseconds, StreamingContext}
-import org.apache.spark.{SparkConf, SparkContext}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-import scala.reflect.ClassTag
-import scala.util.{Failure, Success, Try}
-
-@RunWith(classOf[JUnitRunner])
-class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter {
-
- test("read config") {
-
- val a = "java.lang.String"
- val at = getClassTag(a)
- println(at)
-
- at match {
- case ClassTag(m) => println(m)
- case _ => println("no")
- }
-
- }
-
- private def getClassTag(tp: String): ClassTag[_] = {
- val clazz = Class.forName(tp)
- ClassTag(clazz)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala b/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala
new file mode 100644
index 0000000..ead84f7
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/data/connector/ConnectorTest.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector
+
+import java.util.Date
+import java.util.concurrent.TimeUnit
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.cache.info.InfoCacheInstance
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user.{DataConnectorParam, EvaluateRuleParam}
+import org.apache.griffin.measure.config.reader.ParamRawStringReader
+import org.apache.griffin.measure.data.connector.batch.TextDirBatchDataConnector
+import org.apache.griffin.measure.process.TimingProcess
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
+import org.apache.griffin.measure.rule._
+import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, TimeUtil}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+import org.apache.spark.streaming.{Milliseconds, StreamingContext}
+import org.apache.spark.{SparkConf, SparkContext}
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.reflect.ClassTag
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class ConnectorTest extends FunSuite with Matchers with BeforeAndAfter {
+
+ test("read config") {
+
+ val a = "java.lang.String"
+ val at = getClassTag(a)
+ println(at)
+
+ at match {
+ case ClassTag(m) => println(m)
+ case _ => println("no")
+ }
+
+ }
+
+ private def getClassTag(tp: String): ClassTag[_] = {
+ val clazz = Class.forName(tp)
+ ClassTag(clazz)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
new file mode 100644
index 0000000..a1e4854
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/BatchProcessTest.scala
@@ -0,0 +1,146 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.reader.ParamReaderFactory
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class BatchProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+ val envFile = "src/test/resources/env-test.json"
+ val confFile = "src/test/resources/config-test-profiling.json"
+// val confFile = "src/test/resources/config-test-accuracy.json"
+
+ val envFsType = "local"
+ val userFsType = "local"
+
+ val args = Array(envFile, confFile)
+
+ var allParam: AllParam = _
+
+ before {
+ // read param files
+ val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+ case Success(p) => p
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-2)
+ }
+ }
+ val userParam = readParamFile[UserParam](confFile, userFsType) match {
+ case Success(p) => p
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-2)
+ }
+ }
+ allParam = AllParam(envParam, userParam)
+
+ // validate param files
+ validateParams(allParam) match {
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-3)
+ }
+ case _ => {
+ info("params validation pass")
+ }
+ }
+ }
+
+ test ("batch process") {
+ val procType = ProcessType(allParam.userParam.procType)
+ val proc: DqProcess = procType match {
+ case BatchProcessType => BatchDqProcess(allParam)
+ case StreamingProcessType => StreamingDqProcess(allParam)
+ case _ => {
+ error(s"${procType} is unsupported process type!")
+ sys.exit(-4)
+ }
+ }
+
+ // process init
+ proc.init match {
+ case Success(_) => {
+ info("process init success")
+ }
+ case Failure(ex) => {
+ error(s"process init error: ${ex.getMessage}")
+ shutdown
+ sys.exit(-5)
+ }
+ }
+
+ // process run
+ proc.run match {
+ case Success(_) => {
+ info("process run success")
+ }
+ case Failure(ex) => {
+ error(s"process run error: ${ex.getMessage}")
+
+ if (proc.retriable) {
+ throw ex
+ } else {
+ shutdown
+ sys.exit(-5)
+ }
+ }
+ }
+
+ // process end
+ proc.end match {
+ case Success(_) => {
+ info("process end success")
+ }
+ case Failure(ex) => {
+ error(s"process end error: ${ex.getMessage}")
+ shutdown
+ sys.exit(-5)
+ }
+ }
+
+ shutdown
+ }
+
+ private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+ val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+ paramReader.readConfig[T]
+ }
+
+ private def validateParams(allParam: AllParam): Try[Boolean] = {
+ val allParamValidator = AllParamValidator()
+ allParamValidator.validate(allParam)
+ }
+
+ private def shutdown(): Unit = {
+ PersistThreadPool.shutdown
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
new file mode 100644
index 0000000..b119d76
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/JsonParseTest.scala
@@ -0,0 +1,531 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.reader.ParamReaderFactory
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+import org.apache.griffin.measure.process.engine.DataFrameOprs
+import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
+import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.types._
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.collection.mutable.WrappedArray
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class JsonParseTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+ var sparkContext: SparkContext = _
+ var sqlContext: SQLContext = _
+
+ before {
+ val conf = new SparkConf().setAppName("test json").setMaster("local[*]")
+ sparkContext = new SparkContext(conf)
+ sparkContext.setLogLevel("WARN")
+// sqlContext = new HiveContext(sparkContext)
+ sqlContext = new SQLContext(sparkContext)
+ }
+
+ test ("json test") {
+ // 0. prepare data
+// val dt =
+// """
+// |{"name": "s1", "age": 12, "items": [1, 2, 3],
+// |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
+// |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}"
+// |}""".stripMargin
+// val rdd0 = sparkContext.parallelize(Seq(dt)).map(Row(_))
+ val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
+
+ val vtp = StructField("value", StringType)
+ val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+ df0.registerTempTable("src")
+
+// val fromJson2Array = (s: String) => {
+// JsonUtil.fromJson[Seq[String]](s)
+// }
+// sqlContext.udf.register("from_json_to_array", fromJson2Array)
+//
+// val df2 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as value FROM src")
+// df2.printSchema
+// df2.show(10)
+// df2.registerTempTable("df2")
+
+
+
+ // 1. read from json string to extracted json row
+// val readSql = "SELECT value FROM src"
+// val df = sqlContext.sql(readSql)
+// val df = sqlContext.table("src")
+// val rdd = df.map { row =>
+// row.getAs[String]("value")
+// }
+// val df1 = sqlContext.read.json(rdd)
+// df1.printSchema
+// df1.show(10)
+// df1.registerTempTable("df1")
+ val details = Map[String, Any](("df.name" -> "src"))
+ val df1 = DataFrameOprs.fromJson(sqlContext, details)
+ df1.registerTempTable("df1")
+
+ // 2. extract json array into lines
+// val rdd2 = df1.flatMap { row =>
+// row.getAs[WrappedArray[String]]("seeds")
+// }
+// val df2 = sqlContext.read.json(rdd2)
+ val df2 = sqlContext.sql("select explode(seeds) as value from df1")
+// val tdf = sqlContext.sql("select name, age, explode(items) as item from df1")
+// tdf.registerTempTable("tdf")
+// val df2 = sqlContext.sql("select struct(name, age, item) as ttt from tdf")
+ df2.printSchema
+ df2.show(10)
+ df2.registerTempTable("df2")
+ println(df2.count)
+
+ val sql1 = "SELECT value FROM df2"
+ val df22 = sqlContext.sql(sql1)
+ val rdd22 = df22.map { row =>
+ row.getAs[String]("value")
+ }
+ import org.apache.spark.sql.functions._
+ val df23 = sqlContext.read.json(rdd22)
+ df23.registerTempTable("df23")
+// df23.withColumn("par", monotonicallyIncreasingId)
+
+ val df24 = sqlContext.sql("SELECT url, cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df23")
+ df24.printSchema
+ df24.show(10)
+ df24.registerTempTable("df24")
+ println(df24.count)
+
+// val df25 = sqlContext.sql("select ")
+
+//
+// // 3. extract json string into row
+//// val df3 = sqlContext.sql("select cast(get_json_object(metadata, '$.tracker.crawlRequestCreateTS') as bigint), url from df2")
+// val df3 = sqlContext.sql("select cast(get_json_object(get_json_object(value, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint), get_json_object(value, '$.url') from df2")
+// df3.printSchema()
+// df3.show(10)
+// println(df3.count)
+
+
+
+// val df5 = sqlContext.sql("select get_json_object(value, '$.subs') as subs from src")
+// df5.printSchema()
+// df5.show(10)
+// df5.registerTempTable("df5")
+// val rdd5 = df5.map { row =>
+// row.getAs[String]("subs")
+// }
+// val df6 = sqlContext.read.json(rdd5)
+// df6.printSchema
+// df6.show(10)
+
+ // 2. extract json string to row
+// val df2 = sqlContext.sql("select jstr from df1")
+// val rdd2 = df2.map { row =>
+// row.getAs[String]("jstr")
+// }
+// val df22 = sqlContext.read.json(rdd2)
+// df22.printSchema
+// df22.show(100)
+// df22.registerTempTable("df2")
+//
+// val df23 = sqlContext.sql("select json_tuple(jstr, 's1', 's2') from df1")
+// df23.printSchema()
+// df23.show(100)
+
+ // 3. extract json array into lines ??
+
+ // 3. flatmap from json row to json row
+// val df3 = sqlContext.sql("select explode(subs) as sub, items from df1")
+// df3.printSchema()
+// df3.show(10)
+// df3.registerTempTable("df3")
+//
+// val df4 = sqlContext.sql("select explode(items) as item, sub from df3")
+// df4.printSchema()
+// df4.show(10)
+
+// sqlContext.udf.register("length", (s: WrappedArray[_]) => s.length)
+ //
+ // val df2 = sqlContext.sql("SELECT inner from df1")
+ // df2.registerTempTable("df2")
+ // df2.printSchema
+ // df2.show(100)
+
+// def children(colname: String, df: DataFrame): Array[DataFrame] = {
+// val parent = df.schema.fields.filter(_.name == colname).head
+// println(parent)
+// val fields: Array[StructField] = parent.dataType match {
+// case x: StructType => x.fields
+// case _ => Array.empty[StructField]
+// }
+// fields.map(x => col(s"$colname.${x.name}"))
+//// fields.foreach(println)
+// }
+////
+// children("inner", df2)
+//
+// df2.select(children("bar", df): _*).printSchema
+
+// val df3 = sqlContext.sql("select inline(subs) from df1")
+// df3.printSchema()
+// df3.show(100)
+
+// val rdd2 = df2.flatMap { row =>
+// row.getAs[GenericRowWithSchema]("inner") :: Nil
+// }
+//
+// rdd2.
+
+// val funcs = sqlContext.sql("show functions")
+// funcs.printSchema()
+// funcs.show(1000)
+//
+// val desc = sqlContext.sql("describe function inline")
+// desc.printSchema()
+// desc.show(100)
+
+ //
+
+ }
+
+ test ("json test 2") {
+ val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
+
+ val vtp = StructField("value", StringType)
+ val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+ df0.registerTempTable("tgt")
+
+// val fromJson2StringArray = (s: String) => {
+// val seq = JsonUtil.fromJson[Seq[Any]](s)
+// seq.map(i => JsonUtil.toJson(i))
+// }
+// sqlContext.udf.register("from_json_to_string_array", fromJson2StringArray)
+//
+// val df2 = sqlContext.sql("SELECT from_json_to_string_array(get_json_object(value, '$.groups[0].attrsList')) as value FROM tgt")
+// df2.printSchema()
+// df2.show(10)
+// df2.registerTempTable("df2")
+//
+// val indexOfStringArray = (sa: String, )
+
+
+ // 1. read from json string to extracted json row
+ val readSql = "SELECT value FROM tgt"
+ val df = sqlContext.sql(readSql)
+ val rdd = df.map { row =>
+ row.getAs[String]("value")
+ }
+ val df1 = sqlContext.read.json(rdd)
+ df1.printSchema
+ df1.show(10)
+ df1.registerTempTable("df1")
+
+
+ val df2 = sqlContext.sql("select groups[0].attrsList as attrs from df1")
+ df2.printSchema
+ df2.show(10)
+ df2.registerTempTable("df2")
+ println(df2.count)
+
+ val indexOf = (arr: Seq[String], v: String) => {
+ arr.indexOf(v)
+ }
+ sqlContext.udf.register("index_of", indexOf)
+
+ val df3 = sqlContext.sql("select attrs.values[index_of(attrs.name, 'URL')][0] as url, cast(get_json_object(attrs.values[index_of(attrs.name, 'CRAWLMETADATA')][0], '$.tracker.crawlRequestCreateTS') as bigint) as ts from df2")
+ df3.printSchema()
+ df3.show(10)
+ df3.registerTempTable("df3")
+ }
+
+ test ("testing") {
+ val dt =
+ """
+ |{"name": "age", "age": 12, "items": [1, 2, 3],
+ |"subs": [{"id": 1, "type": "seed"}, {"id": 2, "type": "frog"}],
+ |"inner": {"a": 1, "b": 2}, "jstr": "{\"s1\": \"aaa\", \"s2\": 123}", "b": true
+ |}""".stripMargin
+ val rdd = sparkContext.parallelize(Seq(dt)).map(Row(_))
+ val vtp = StructField("value", StringType)
+ val df = sqlContext.createDataFrame(rdd, StructType(Array(vtp)))
+ df.registerTempTable("df")
+
+ val df1 = sqlContext.read.json(sqlContext.sql("select * from df").map(r => r.getAs[String]("value")))
+ df1.printSchema()
+ df1.show(10)
+ df1.registerTempTable("df1")
+
+ val test = (s: String) => {
+ s.toInt
+ }
+ sqlContext.udf.register("to_int", test)
+
+ val df2 = sqlContext.sql("select (b) as aa, inner.a from df1 where age = to_int(\"12\")")
+ df2.printSchema()
+ df2.show(10)
+ }
+
+ test ("test input only sql") {
+ val rdd0 = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
+
+ val vtp = StructField("value", StringType)
+ val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+ df0.registerTempTable("src")
+ df0.show(10)
+
+ // 1. read from json string to extracted json row
+ val df1 = sqlContext.sql("SELECT get_json_object(value, '$.seeds') as seeds FROM src")
+ df1.printSchema
+ df1.show(10)
+ df1.registerTempTable("df1")
+
+ val json2StringArray: (String) => Seq[String] = (s: String) => {
+ val seq = JsonUtil.fromJson[Seq[String]](s)
+// seq.map(i => JsonUtil.toJson(i))
+ seq
+ }
+ sqlContext.udf.register("json_to_string_array", json2StringArray)
+
+ val df2 = sqlContext.sql("SELECT explode(json_to_string_array(seeds)) as seed FROM df1")
+ df2.printSchema
+ df2.show(10)
+ df2.registerTempTable("df2")
+
+
+ val df3 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM df2")
+ df3.printSchema
+ df3.show(10)
+ }
+
+ test ("test output only sql") {
+ val rdd0 = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
+
+ val vtp = StructField("value", StringType)
+ val df0 = sqlContext.createDataFrame(rdd0, StructType(Array(vtp)))
+ df0.registerTempTable("tgt")
+ df0.printSchema()
+ df0.show(10)
+
+ val json2StringArray: (String) => Seq[String] = (s: String) => {
+ JsonUtil.fromJson[Seq[String]](s)
+ }
+ sqlContext.udf.register("json_to_string_array", json2StringArray)
+
+ val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
+ val seq = JsonUtil.fromJson[Seq[Any]](s)
+ seq.map(i => JsonUtil.toJson(i))
+ }
+ sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
+
+ val indexOf = (arr: Seq[String], v: String) => {
+ arr.indexOf(v)
+ }
+ sqlContext.udf.register("index_of", indexOf)
+
+ val indexOfField = (arr: Seq[String], k: String, v: String) => {
+ val seq = arr.flatMap { item =>
+ JsonUtil.fromJson[Map[String, Any]](item).get(k)
+ }
+ seq.indexOf(v)
+ }
+ sqlContext.udf.register("index_of_field", indexOfField)
+
+ // 1. read from json string to extracted json row
+ val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tgt")
+ df1.printSchema
+ df1.show(10)
+ df1.registerTempTable("df1")
+
+ val df2 = sqlContext.sql("SELECT json_to_string_json_array(attrs) as attrs FROM df1")
+ df2.printSchema()
+ df2.show(10)
+ df2.registerTempTable("df2")
+
+ val df3 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM df2")
+ df3.printSchema()
+ df3.show(10)
+ df3.registerTempTable("df3")
+
+ val df4 = sqlContext.sql("SELECT json_to_string_array(get_json_object(attr1, '$.values'))[0], cast(get_json_object(json_to_string_array(get_json_object(attr2, '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) FROM df3")
+ df4.printSchema()
+ df4.show(10)
+ }
+
+ test ("test from json") {
+ val fromJson2Map = (str: String) => {
+ val a = JsonUtil.fromJson[Map[String, Any]](str)
+ a.mapValues { v =>
+ v match {
+ case t: String => t
+ case _ => JsonUtil.toJson(v)
+ }
+ }
+ }
+ sqlContext.udf.register("from_json_to_map", fromJson2Map)
+
+ val fromJson2Array = (str: String) => {
+ val a = JsonUtil.fromJson[Seq[Any]](str)
+ a.map { v =>
+ v match {
+ case t: String => t
+ case _ => JsonUtil.toJson(v)
+ }
+ }
+ }
+ sqlContext.udf.register("from_json_to_array", fromJson2Array)
+
+ // ========================
+
+ val srdd = sparkContext.textFile("src/test/resources/input.msg").map(Row(_))
+ val svtp = StructField("value", StringType)
+ val sdf0 = sqlContext.createDataFrame(srdd, StructType(Array(svtp)))
+ sdf0.registerTempTable("sdf0")
+ sdf0.show(10)
+
+ // 1. read from json string to extracted json row
+ val sdf1 = sqlContext.sql("SELECT explode(from_json_to_array(get_json_object(value, '$.seeds'))) as seed FROM sdf0")
+ sdf1.printSchema
+ sdf1.show(10)
+ sdf1.registerTempTable("sdf1")
+
+ val sdf2 = sqlContext.sql("SELECT get_json_object(seed, '$.url') as url, cast(get_json_object(get_json_object(seed, '$.metadata'), '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM sdf1")
+ sdf2.printSchema
+ sdf2.show(10)
+
+ // ---------------------------------------
+
+ val trdd = sparkContext.textFile("src/test/resources/output.msg").map(Row(_))
+ val tvtp = StructField("value", StringType)
+ val tdf0 = sqlContext.createDataFrame(trdd, StructType(Array(tvtp)))
+ tdf0.registerTempTable("tdf0")
+ tdf0.printSchema()
+ tdf0.show(10)
+
+// val json2StringArray: (String) => Seq[String] = (s: String) => {
+// JsonUtil.fromJson[Seq[String]](s)
+// }
+// sqlContext.udf.register("json_to_string_array", json2StringArray)
+//
+// val json2StringJsonArray: (String) => Seq[String] = (s: String) => {
+// val seq = JsonUtil.fromJson[Seq[Any]](s)
+// seq.map(i => JsonUtil.toJson(i))
+// }
+// sqlContext.udf.register("json_to_string_json_array", json2StringJsonArray)
+//
+// val indexOf = (arr: Seq[String], v: String) => {
+// arr.indexOf(v)
+// }
+// sqlContext.udf.register("index_of", indexOf)
+//
+ val indexOfField = (arr: Seq[String], k: String, v: String) => {
+ val seq = arr.flatMap { item =>
+ JsonUtil.fromJson[Map[String, Any]](item).get(k)
+ }
+ seq.indexOf(v)
+ }
+ sqlContext.udf.register("index_of_field", indexOfField)
+
+ // 1. read from json string to extracted json row
+// val df1 = sqlContext.sql("SELECT get_json_object(value, '$.groups[0].attrsList') as attrs FROM tdf0")
+ val tdf1 = sqlContext.sql("SELECT from_json_to_array(get_json_object(value, '$.groups[0].attrsList')) as attrs FROM tdf0")
+ tdf1.printSchema
+ tdf1.show(10)
+ tdf1.registerTempTable("tdf1")
+
+// val tdf2 = sqlContext.sql("SELECT attrs[index_of_field(attrs, 'name', 'URL')] as attr1, attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')] as attr2 FROM tdf1")
+// tdf2.printSchema()
+// tdf2.show(10)
+// tdf2.registerTempTable("tdf2")
+
+ val tdf3 = sqlContext.sql("SELECT from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'URL')], '$.values'))[0] as url, cast(get_json_object(from_json_to_array(get_json_object(attrs[index_of_field(attrs, 'name', 'CRAWLMETADATA')], '$.values'))[0], '$.tracker.crawlRequestCreateTS') as bigint) as ts FROM tdf1")
+ tdf3.printSchema()
+ tdf3.show(10)
+ }
+
+ test ("sql functions") {
+ val functions = sqlContext.sql("show functions")
+ functions.printSchema()
+ functions.show(10)
+
+ val functionNames = functions.map(_.getString(0)).collect
+ functionNames.foreach(println)
+ }
+
+ test ("test text file read") {
+ val partitionPaths = Seq[String](
+ "hdfs://localhost/griffin/streaming/dump/source/418010/25080625/1504837518000",
+ "hdfs://localhost/griffin/streaming/dump/target/418010/25080625/1504837518000")
+ val df = sqlContext.read.json(partitionPaths: _*)
+ df.printSchema()
+ df.show(10)
+ }
+
+ test ("list paths") {
+ val filePath = "hdfs://localhost/griffin/streaming/dump/source"
+ val partitionRanges = List[(Long, Long)]((0, 0), (-2, 0))
+ val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
+ println(partitionPaths)
+ }
+
+ private def listPathsBetweenRanges(paths: List[String],
+ partitionRanges: List[(Long, Long)]
+ ): List[String] = {
+ partitionRanges match {
+ case Nil => paths
+ case head :: tail => {
+ val (lb, ub) = head
+ val curPaths = paths.flatMap { path =>
+ val names = HdfsUtil.listSubPathsByType(path, "dir").toList
+ println(names)
+ names.filter { name =>
+ str2Long(name) match {
+ case Some(t) => (t >= lb) && (t <= ub)
+ case _ => false
+ }
+ }.map(HdfsUtil.getHdfsFilePath(path, _))
+ }
+ listPathsBetweenRanges(curPaths, tail)
+ }
+ }
+ }
+
+ private def str2Long(str: String): Option[Long] = {
+ try {
+ Some(str.toLong)
+ } catch {
+ case e: Throwable => None
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
new file mode 100644
index 0000000..394917c
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/JsonToStructs.scala
@@ -0,0 +1,85 @@
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.utils.JsonUtil
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
+import org.apache.spark.sql.execution.datasources.json.JSONOptions
+import org.apache.spark.sql.types._
+import org.apache.spark.unsafe.types.UTF8String
+
+
+case class JsonToStructs(
+// schema: DataType,
+// options: Map[String, String],
+ child: Expression)
+ extends UnaryExpression with CodegenFallback with ExpectsInputTypes {
+ override def nullable: Boolean = true
+
+// def this(schema: DataType, options: Map[String, String], child: Expression) =
+// this(schema, options, child, None)
+
+ // Used in `FunctionRegistry`
+// def this(child: Expression, schema: Expression) =
+// this(
+// schema = JsonExprUtils.validateSchemaLiteral(schema),
+// options = Map.empty[String, String],
+// child = child,
+// timeZoneId = None)
+//
+// def this(child: Expression, schema: Expression, options: Expression) =
+// this(
+// schema = JsonExprUtils.validateSchemaLiteral(schema),
+// options = JsonExprUtils.convertToMapData(options),
+// child = child,
+// timeZoneId = None)
+//
+// override def checkInputDataTypes(): TypeCheckResult = schema match {
+// case _: StructType | ArrayType(_: StructType, _) =>
+// super.checkInputDataTypes()
+// case _ => TypeCheckResult.TypeCheckFailure(
+// s"Input schema ${schema.simpleString} must be a struct or an array of structs.")
+// }
+
+ override def dataType: DataType = MapType(StringType, StringType)
+
+// override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression =
+// copy(timeZoneId = Option(timeZoneId))
+
+ override def nullSafeEval(json: Any): Any = {
+ if (json.toString.trim.isEmpty) return null
+
+ try {
+ JsonUtil.fromJson[Map[String, Any]](json.toString)
+ } catch {
+ case _: Throwable => null
+ }
+ }
+
+ override def inputTypes: Seq[DataType] = StringType :: Nil
+}
+//
+//object JsonExprUtils {
+//
+// def validateSchemaLiteral(exp: Expression): StructType = exp match {
+// case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString)
+// case e => throw new AnalysisException(s"Expected a string literal instead of $e")
+// }
+//
+// def convertToMapData(exp: Expression): Map[String, String] = exp match {
+// case m: CreateMap
+// if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) =>
+// val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData]
+// ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) =>
+// key.toString -> value.toString
+// }
+// case m: CreateMap =>
+// throw new AnalysisException(
+// s"A type of keys and values in map() must be string, but got ${m.dataType}")
+// case _ =>
+// throw new AnalysisException("Must use a map() function for options")
+// }
+//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
new file mode 100644
index 0000000..07b7c5e
--- /dev/null
+++ b/measure/src/test/scala/org/apache/griffin/measure/process/StreamingProcessTest.scala
@@ -0,0 +1,147 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.process
+
+import org.apache.griffin.measure.config.params._
+import org.apache.griffin.measure.config.params.env._
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.config.reader.ParamReaderFactory
+import org.apache.griffin.measure.config.validator.AllParamValidator
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.persist.PersistThreadPool
+import org.junit.runner.RunWith
+import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
+
+import scala.util.{Failure, Success, Try}
+
+@RunWith(classOf[JUnitRunner])
+class StreamingProcessTest extends FunSuite with Matchers with BeforeAndAfter with Loggable {
+
+ val envFile = "src/test/resources/env-streaming.json"
+// val confFile = "src/test/resources/config-test-accuracy-streaming-multids.json"
+ val confFile = "src/test/resources/config-test-accuracy-streaming.json"
+// val confFile = "src/test/resources/config-test-profiling-streaming.json"
+
+ val envFsType = "local"
+ val userFsType = "local"
+
+ val args = Array(envFile, confFile)
+
+ var allParam: AllParam = _
+
+ before {
+ // read param files
+ val envParam = readParamFile[EnvParam](envFile, envFsType) match {
+ case Success(p) => p
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-2)
+ }
+ }
+ val userParam = readParamFile[UserParam](confFile, userFsType) match {
+ case Success(p) => p
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-2)
+ }
+ }
+ allParam = AllParam(envParam, userParam)
+
+ // validate param files
+ validateParams(allParam) match {
+ case Failure(ex) => {
+ error(ex.getMessage)
+ sys.exit(-3)
+ }
+ case _ => {
+ info("params validation pass")
+ }
+ }
+ }
+
+ test ("streaming process") {
+ val procType = ProcessType(allParam.userParam.procType)
+ val proc: DqProcess = procType match {
+ case BatchProcessType => BatchDqProcess(allParam)
+ case StreamingProcessType => StreamingDqProcess(allParam)
+ case _ => {
+ error(s"${procType} is unsupported process type!")
+ sys.exit(-4)
+ }
+ }
+
+ // process init
+ proc.init match {
+ case Success(_) => {
+ info("process init success")
+ }
+ case Failure(ex) => {
+ error(s"process init error: ${ex.getMessage}")
+ shutdown
+ sys.exit(-5)
+ }
+ }
+
+ // process run
+ proc.run match {
+ case Success(_) => {
+ info("process run success")
+ }
+ case Failure(ex) => {
+ error(s"process run error: ${ex.getMessage}")
+
+ if (proc.retriable) {
+ throw ex
+ } else {
+ shutdown
+ sys.exit(-5)
+ }
+ }
+ }
+
+ // process end
+ proc.end match {
+ case Success(_) => {
+ info("process end success")
+ }
+ case Failure(ex) => {
+ error(s"process end error: ${ex.getMessage}")
+ shutdown
+ sys.exit(-5)
+ }
+ }
+
+ shutdown
+ }
+
+ private def readParamFile[T <: Param](file: String, fsType: String)(implicit m : Manifest[T]): Try[T] = {
+ val paramReader = ParamReaderFactory.getParamReader(file, fsType)
+ paramReader.readConfig[T]
+ }
+
+ private def validateParams(allParam: AllParam): Try[Boolean] = {
+ val allParamValidator = AllParamValidator()
+ allParamValidator.validate(allParam)
+ }
+
+ private def shutdown(): Unit = {
+ PersistThreadPool.shutdown
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala
----------------------------------------------------------------------
diff --git a/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala b/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala
deleted file mode 100644
index dd8d4a0..0000000
--- a/measure/src/test/scala/org/apache/griffin/measure/rule/ExprValueUtilTest.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.rule
-
-import org.apache.griffin.measure.config.params.user.EvaluateRuleParam
-import org.apache.griffin.measure.rule.expr.{Expr, StatementExpr}
-import org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
-import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
-
-@RunWith(classOf[JUnitRunner])
-class ExprValueUtilTest extends FunSuite with BeforeAndAfter with Matchers {
-
- test ("rule calculation") {
- // val rules = "$source.json().name = 's2' and $source.json().age[*] = 32"
- // val rules = "$source.json().items[*] = 202 AND $source.json().age[*] = 32 AND $source.json().df[*].a = 1"
- val rules = "$source.json().items[*] = 202 AND $source.json().age[*] = 32 AND $source.json().df['a' = 1].b = 4"
- // val rules = "$source.json().df[0].a = 1"
- val ep = EvaluateRuleParam(1, rules)
-
- val ruleFactory = RuleFactory(ep)
- val rule: StatementExpr = ruleFactory.generateRule()
- val ruleAnalyzer: RuleAnalyzer = RuleAnalyzer(rule)
-
- val ruleExprs = ruleAnalyzer.sourceRuleExprs
- val constFinalExprValueMap = Map[String, Any]()
-
- val data = List[String](
- ("""{"name": "s1", "age": [22, 23], "items": [102, 104, 106], "df": [{"a": 1, "b": 3}, {"a": 2, "b": 4}]}"""),
- ("""{"name": "s2", "age": [32, 33], "items": [202, 204, 206], "df": [{"a": 1, "b": 4}, {"a": 2, "b": 4}]}"""),
- ("""{"name": "s3", "age": [42, 43], "items": [302, 304, 306], "df": [{"a": 1, "b": 5}, {"a": 2, "b": 4}]}""")
- )
-
- def str(expr: Expr) = {
- s"${expr._id}: ${expr.desc} [${expr.getClass.getSimpleName}]"
- }
- println("====")
- ruleExprs.finalCacheExprs.foreach { expr =>
- println(str(expr))
- }
- println("====")
- ruleExprs.cacheExprs.foreach { expr =>
- println(str(expr))
- }
-
- val constExprValueMap = ExprValueUtil.genExprValueMaps(None, ruleAnalyzer.constCacheExprs, Map[String, Any]())
- val finalConstExprValueMap = ExprValueUtil.updateExprValueMaps(ruleAnalyzer.constFinalCacheExprs, constExprValueMap)
- val finalConstMap = finalConstExprValueMap.headOption match {
- case Some(m) => m
- case _ => Map[String, Any]()
- }
- println("====")
- println(ruleAnalyzer.constCacheExprs)
- println(ruleAnalyzer.constFinalCacheExprs)
- println(finalConstMap)
-
- println("====")
- val valueMaps = data.flatMap { msg =>
- val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, finalConstMap)
- val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
- finalExprValueMaps
- }
-
- valueMaps.foreach(println)
- println(valueMaps.size)
-
- }
-
-}