You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2018/04/26 08:12:45 UTC
[22/50] [abbrv] incubator-griffin git commit: batch pass
batch pass
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ada8a0d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ada8a0d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ada8a0d4
Branch: refs/heads/griffin-0.2.0-incubating-rc4
Commit: ada8a0d4b02a05793b220998cddeb1de5179d80d
Parents: 07fcb29
Author: Lionel Liu <bh...@163.com>
Authored: Mon Apr 16 22:07:09 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Apr 16 22:07:09 2018 +0800
----------------------------------------------------------------------
measure/pom.xml | 2 +-
.../data/source/cache/DataSourceCache.scala | 2 +-
.../data/source/cache/JsonDataSourceCache.scala | 4 +-
.../data/source/cache/OrcDataSourceCache.scala | 4 +-
.../source/cache/ParquetDataSourceCache.scala | 4 +-
.../griffin/measure/persist/MultiPersists.scala | 2 +-
.../griffin/measure/persist/Persist.scala | 2 +-
.../measure/process/BatchDqProcess.scala | 27 +-
.../measure/process/StreamingDqProcess.scala | 26 +-
.../process/engine/DataFrameOprEngine.scala | 10 +-
.../measure/process/engine/DqEngine.scala | 2 +-
.../measure/process/engine/DqEngines.scala | 2 +-
.../measure/process/engine/SparkDqEngine.scala | 9 +-
.../measure/process/engine/SparkSqlEngine.scala | 2 +-
.../measure/rule/adaptor/RuleAdaptorGroup.scala | 17 +-
.../griffin/measure/util/MessageUtil.java | 320 +++++++++----------
16 files changed, 233 insertions(+), 202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index 42fc967..2dffd35 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -117,7 +117,7 @@ under the License.
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_${scala.binary.version}</artifactId>
- <version>2.0.1</version>
+ <version>4.0.0</version>
</dependency>
<!--csv-->
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
index 2412130..f70bd11 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
@@ -98,7 +98,7 @@ trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable w
val defOldCacheIndex = 0L
- protected def writeDataFrame(dfw: DataFrameWriter, path: String): Unit
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
def init(): Unit = {}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
index e284d47..2fa5316 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.data.source.cache
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext}
+import org.apache.spark.sql._
case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
dsName: String, index: Int
@@ -28,7 +28,7 @@ case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
// sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
}
- def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+ def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
println(s"write path: ${path}")
dfw.json(path)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
index 7b92bef..5bf2500 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.data.source.cache
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext}
+import org.apache.spark.sql._
case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
dsName: String, index: Int
@@ -28,7 +28,7 @@ case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
// sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
}
- def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+ def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
println(s"write path: ${path}")
dfw.orc(path)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
index 89cd0b7..f39d832 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.data.source.cache
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext}
+import org.apache.spark.sql._
case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
dsName: String, index: Int
@@ -28,7 +28,7 @@ case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any
sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
}
- def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+ def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
println(s"write path: ${path}")
dfw.parquet(path)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
index bed28fd..4b30cba 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.persist
import org.apache.griffin.measure.result._
import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Dataset}
import scala.util.Try
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
index 361fad7..300f2c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.persist
import org.apache.griffin.measure.log.Loggable
import org.apache.griffin.measure.result._
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Dataset}
import scala.util.Try
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 2770de8..04b608a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -30,9 +30,8 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters,
import org.apache.griffin.measure.rule.adaptor._
import org.apache.griffin.measure.rule.plan._
import org.apache.griffin.measure.rule.udf._
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.SparkConf
import scala.util.Try
@@ -46,24 +45,31 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
val dataSourceNames = userParam.dataSources.map(_.name)
val baselineDsName = userParam.baselineDsName
- var sparkContext: SparkContext = _
+// var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
+ var sparkSession: SparkSession = _
+
def retriable: Boolean = false
def init: Try[_] = Try {
val conf = new SparkConf().setAppName(metricName)
conf.setAll(sparkParam.config)
- sparkContext = new SparkContext(conf)
- sparkContext.setLogLevel(sparkParam.logLevel)
- sqlContext = new HiveContext(sparkContext)
+ conf.set("spark.sql.crossJoin.enabled", "true")
+ sparkSession = if (conf.contains("hive.metastore.uris")) {
+ SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+ } else {
+ SparkSession.builder().config(conf).getOrCreate()
+ }
+ sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+ sqlContext = sparkSession.sqlContext
// register udf
GriffinUdfs.register(sqlContext)
GriffinUdafs.register(sqlContext)
// init adaptors
- RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
+ RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName)
}
def run: Try[_] = Try {
@@ -78,7 +84,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
val persist: Persist = persistFactory.getPersists(appTime)
// persist start id
- val applicationId = sparkContext.applicationId
+ val applicationId = sparkSession.sparkContext.applicationId
persist.start(applicationId)
// get dq engines
@@ -146,7 +152,8 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
DataFrameCaches.uncacheGlobalDataFrames()
DataFrameCaches.clearGlobalTrashDataFrames()
- sparkContext.stop
+ sparkSession.close()
+ sparkSession.stop()
}
// private def cleanData(t: Long): Unit = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
index b2af46a..b7ec449 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
import org.apache.griffin.measure.rule.udf._
import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
@@ -46,17 +46,24 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
val dataSourceNames = userParam.dataSources.map(_.name)
val baselineDsName = userParam.baselineDsName
- var sparkContext: SparkContext = _
+// var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
+ var sparkSession: SparkSession = _
+
def retriable: Boolean = true
def init: Try[_] = Try {
val conf = new SparkConf().setAppName(metricName)
conf.setAll(sparkParam.config)
- sparkContext = new SparkContext(conf)
- sparkContext.setLogLevel(sparkParam.logLevel)
- sqlContext = new HiveContext(sparkContext)
+ conf.set("spark.sql.crossJoin.enabled", "true")
+ sparkSession = if (conf.contains("hive.metastore.uris")) {
+ SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+ } else {
+ SparkSession.builder().config(conf).getOrCreate()
+ }
+ sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+ sqlContext = sparkSession.sqlContext
// clear checkpoint directory
clearCpDir
@@ -71,7 +78,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
// init adaptors
val dataSourceNames = userParam.dataSources.map(_.name)
- RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
+ RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName)
}
def run: Try[_] = Try {
@@ -94,7 +101,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
val persist: Persist = persistFactory.getPersists(appTime)
// persist start id
- val applicationId = sparkContext.applicationId
+ val applicationId = sparkSession.sparkContext.applicationId
persist.start(applicationId)
// get dq engines
@@ -149,7 +156,8 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
DataFrameCaches.uncacheGlobalDataFrames()
DataFrameCaches.clearGlobalTrashDataFrames()
- sparkContext.stop
+ sparkSession.close()
+ sparkSession.stop()
InfoCacheInstance.close
}
@@ -159,7 +167,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
case Some(interval) => Milliseconds(interval)
case _ => throw new Exception("invalid batch interval")
}
- val ssc = new StreamingContext(sparkContext, batchInterval)
+ val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
ssc.checkpoint(sparkParam.cpDir)
ssc
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
index 59b765e..600da45 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
@@ -38,6 +38,8 @@ import org.apache.griffin.measure.utils.ParamUtil._
import scala.util.Try
+import org.apache.spark.sql._
+
case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine {
def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
@@ -85,9 +87,11 @@ object DataFrameOprs {
val dfName = details.getOrElse(_dfName, "").toString
val colNameOpt = details.get(_colName).map(_.toString)
- val df = sqlContext.table(s"`${dfName}`")
+ implicit val encoder = Encoders.STRING
+
+ val df: DataFrame = sqlContext.table(s"`${dfName}`")
val rdd = colNameOpt match {
- case Some(colName: String) => df.map(_.getAs[String](colName))
+ case Some(colName: String) => df.map(r => r.getAs[String](colName))
case _ => df.map(_.getAs[String](0))
}
sqlContext.read.json(rdd) // slow process
@@ -116,6 +120,8 @@ object DataFrameOprs {
}
}
+ implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.bean(classOf[AccuracyResult]))
+
val df = sqlContext.table(s"`${dfName}`")
val results = df.flatMap { row =>
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index 3d77458..a2de69d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -26,7 +26,7 @@ import org.apache.griffin.measure.process.ProcessType
import org.apache.griffin.measure.rule.dsl._
import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
trait DqEngine extends Loggable with Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 6b9a215..a3ef7dc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -31,7 +31,7 @@ import org.apache.griffin.measure.rule.dsl._
import org.apache.griffin.measure.rule.plan.{DsUpdate, _}
import org.apache.griffin.measure.utils.JsonUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
//import scala.concurrent._
//import scala.concurrent.duration.Duration
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index 382e302..4ba185f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.rule.dsl._
import org.apache.griffin.measure.rule.plan._
import org.apache.griffin.measure.utils.JsonUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql._
import org.apache.griffin.measure.utils.ParamUtil._
trait SparkDqEngine extends DqEngine {
@@ -115,10 +115,11 @@ trait SparkDqEngine extends DqEngine {
}
def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = {
- getRecordDataFrame(recordExport).map(_.toJSON)
+ getRecordDataFrame(recordExport).map(_.toJSON.rdd)
}
def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
+ implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport
getRecordDataFrame(recordExport) match {
case Some(stepDf) => {
@@ -149,7 +150,7 @@ trait SparkDqEngine extends DqEngine {
}
} else None
}
- (Some(records.groupByKey), emptyTmsts)
+ (Some(records.rdd.groupByKey), emptyTmsts)
} else (None, emptyTmsts)
}
case _ => {
@@ -163,7 +164,7 @@ trait SparkDqEngine extends DqEngine {
case e: Throwable => None
}
}
- (Some(records.groupByKey), Set[Long]())
+ (Some(records.rdd.groupByKey), Set[Long]())
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 438595b..e4ecb49 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.rule.dsl._
import org.apache.griffin.measure.rule.plan._
import org.apache.griffin.measure.utils.JsonUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext}
+import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.StreamingContext
case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
index 30a356c..5b5f419 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.process.ProcessType
import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
import org.apache.griffin.measure.rule.dsl._
import org.apache.griffin.measure.rule.plan._
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
import scala.collection.mutable.{Map => MutableMap}
@@ -46,14 +46,23 @@ object RuleAdaptorGroup {
functionNames = funcNames
}
- def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = {
- val functions = sqlContext.sql("show functions")
- functionNames = functions.map(_.getString(0)).collect.toSeq
+ def init(sparkSession: SparkSession, dsNames: Seq[String], blDsName: String): Unit = {
+ implicit val encoder = Encoders.STRING
+ val functions = sparkSession.catalog.listFunctions
+ functionNames = functions.map(_.name).collect.toSeq
dataSourceNames = dsNames
baselineDsName = blDsName
}
+// def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = {
+ // val functions = sqlContext.sql("show functions")
+ // functionNames = functions.map(_.getString(0)).collect.toSeq
+ // dataSourceNames = dsNames
+ //
+ // baselineDsName = blDsName
+ // }
+
private def getDslType(param: Map[String, Any], defDslType: DslType) = {
DslType(param.getOrElse(_dslType, defDslType.desc).toString)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
index 6fba5ff..4f15b62 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
+++ b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
@@ -16,163 +16,163 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
-package org.apache.griffin.measure.util;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.StringUtils;
-import java.util.ArrayList;
-import java.io.*;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.*;
-import java.util.*;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-import org.apache.griffin.measure.config.params.env.SMSParam;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Created by xiaoqiu.duan on 2017/9/11.
- */
-public class MessageUtil {
-
-
- public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){
- String url=smsParam.host();
- String SYS_ID=smsParam.id();
- String KEY=smsParam.key();
- String sendContext="["+smsParam.uuid()+"]: "+content ;
- System.out.println(" sendContext: "+sendContext);
- Long timestamp=new Date().getTime()+20610;
- System.out.println(" timestamp: "+timestamp);
- String[] tels=teamPhone.split(",") ;
- String uuid = UUID.randomUUID().toString().replaceAll("-", "");
- Map<String,Object> param=new HashMap<String, Object>();
- param.put("apportionment","JR_DATA_ALARM");
- param.put("event",0);
- param.put("eventTime",0);
- param.put("isMasked",false);
- param.put("originator","ETC");
- param.put("reqId",uuid);
- param.put("schemaKey","");
- param.put("sysId",SYS_ID);
- param.put("template",sendContext);
- param.put("templateId","");
- param.put("timestamp",timestamp);
- param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY));
- param.put("typeCode","JR_DATA_ALARM");
- System.out.println("params: "+param);
- List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>();
- for (int i=0;i<tels.length;i++) {
- Map<String, Object> body = new HashMap<String, Object>();
- body.put("phoneNo", tels[i]);
- body.put("params", "");
- body.put("userId", 0);
- bodys.add(body);
- }
- System.out.println("bodys: "+bodys);
- JSONObject jsonParam = new JSONObject();
- try {
- jsonParam.put("params",param);
- jsonParam.put("bodys",bodys);
- System.out.println("jsonParam: "+jsonParam);
- System.out.println("jsonParam: "+jsonParam.toString());
- } catch (JSONException e) {
- e.printStackTrace();
- }
- URL u=null;
- int smsnum=0;
- HttpURLConnection connection=null;
- try{
- String result= postRequestUrl(url, jsonParam.toString(), "utf-8");
- return "send success";
- }catch(Exception e){
- e.printStackTrace();
- return null;
- }
-
- }
-
-
- public static String postRequestUrl(String url, String param,String encode) {
- OutputStreamWriter out = null;
- BufferedReader reader = null;
- String response="";
- try {
- URL httpUrl = null;
- httpUrl = new URL(url);
- HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection();
- conn.setRequestMethod("POST");
- conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded
- conn.setRequestProperty("connection", "keep-alive");
- conn.setUseCaches(false);
- conn.setInstanceFollowRedirects(true);
- conn.setDoOutput(true);
- conn.setDoInput(true);
- conn.connect();
- //POST
- out = new OutputStreamWriter(
- conn.getOutputStream());
- out.write(param);
-
- out.flush();
-
- System.out.println("send POST "+conn.getResponseCode());
- reader = new BufferedReader(new InputStreamReader(
- conn.getInputStream()));
-
- String lines;
- while ((lines = reader.readLine()) != null) {
- lines = new String(lines.getBytes(), "utf-8");
- response+=lines;
- System.out.println("lines: "+lines);
- }
- reader.close();
- conn.disconnect();
-
- //log.info(response.toString());
- } catch (Exception e) {
- System.out.println("send POST error!"+e);
- e.printStackTrace();
- }
- finally{
- try{
- if(out!=null){
- out.close();
- }
- if(reader!=null){
- reader.close();
- }
- }
- catch(IOException ex){
- ex.printStackTrace();
- }
- }
-
- return response;
- }
-
-
-
- private static String readBufferedContent(BufferedReader bufferedReader) {
- if (bufferedReader == null)
- return null;
- StringBuffer result = new StringBuffer();
- String line = null;
- try {
- while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) {
- result.append(line+"\r\n");
- }
- } catch (IOException e) {
- e.printStackTrace();
- return null;
- }
- return result.toString();
- }
-
-}
+//package org.apache.griffin.measure.util;
+//
+//import java.io.IOException;
+//
+//import org.apache.commons.lang.StringUtils;
+//import java.util.ArrayList;
+//import java.io.*;
+//import java.io.BufferedReader;
+//import java.io.InputStreamReader;
+//import java.net.*;
+//import java.util.*;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//
+//import org.apache.griffin.measure.config.params.env.SMSParam;
+//import org.json.JSONException;
+//import org.json.JSONObject;
+//
+///**
+// * Created by xiaoqiu.duan on 2017/9/11.
+// */
+//public class MessageUtil {
+//
+//
+// public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){
+// String url=smsParam.host();
+// String SYS_ID=smsParam.id();
+// String KEY=smsParam.key();
+// String sendContext="["+smsParam.uuid()+"]: "+content ;
+// System.out.println(" sendContext: "+sendContext);
+// Long timestamp=new Date().getTime()+20610;
+// System.out.println(" timestamp: "+timestamp);
+// String[] tels=teamPhone.split(",") ;
+// String uuid = UUID.randomUUID().toString().replaceAll("-", "");
+// Map<String,Object> param=new HashMap<String, Object>();
+// param.put("apportionment","JR_DATA_ALARM");
+// param.put("event",0);
+// param.put("eventTime",0);
+// param.put("isMasked",false);
+// param.put("originator","ETC");
+// param.put("reqId",uuid);
+// param.put("schemaKey","");
+// param.put("sysId",SYS_ID);
+// param.put("template",sendContext);
+// param.put("templateId","");
+// param.put("timestamp",timestamp);
+// param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY));
+// param.put("typeCode","JR_DATA_ALARM");
+// System.out.println("params: "+param);
+// List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>();
+// for (int i=0;i<tels.length;i++) {
+// Map<String, Object> body = new HashMap<String, Object>();
+// body.put("phoneNo", tels[i]);
+// body.put("params", "");
+// body.put("userId", 0);
+// bodys.add(body);
+// }
+// System.out.println("bodys: "+bodys);
+// JSONObject jsonParam = new JSONObject();
+// try {
+// jsonParam.put("params",param);
+// jsonParam.put("bodys",bodys);
+// System.out.println("jsonParam: "+jsonParam);
+// System.out.println("jsonParam: "+jsonParam.toString());
+// } catch (JSONException e) {
+// e.printStackTrace();
+// }
+// URL u=null;
+// int smsnum=0;
+// HttpURLConnection connection=null;
+// try{
+// String result= postRequestUrl(url, jsonParam.toString(), "utf-8");
+// return "send success";
+// }catch(Exception e){
+// e.printStackTrace();
+// return null;
+// }
+//
+// }
+//
+//
+// public static String postRequestUrl(String url, String param,String encode) {
+// OutputStreamWriter out = null;
+// BufferedReader reader = null;
+// String response="";
+// try {
+// URL httpUrl = null;
+// httpUrl = new URL(url);
+// HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection();
+// conn.setRequestMethod("POST");
+// conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded
+// conn.setRequestProperty("connection", "keep-alive");
+// conn.setUseCaches(false);
+// conn.setInstanceFollowRedirects(true);
+// conn.setDoOutput(true);
+// conn.setDoInput(true);
+// conn.connect();
+// //POST
+// out = new OutputStreamWriter(
+// conn.getOutputStream());
+// out.write(param);
+//
+// out.flush();
+//
+// System.out.println("send POST "+conn.getResponseCode());
+// reader = new BufferedReader(new InputStreamReader(
+// conn.getInputStream()));
+//
+// String lines;
+// while ((lines = reader.readLine()) != null) {
+// lines = new String(lines.getBytes(), "utf-8");
+// response+=lines;
+// System.out.println("lines: "+lines);
+// }
+// reader.close();
+// conn.disconnect();
+//
+// //log.info(response.toString());
+// } catch (Exception e) {
+// System.out.println("send POST error!"+e);
+// e.printStackTrace();
+// }
+// finally{
+// try{
+// if(out!=null){
+// out.close();
+// }
+// if(reader!=null){
+// reader.close();
+// }
+// }
+// catch(IOException ex){
+// ex.printStackTrace();
+// }
+// }
+//
+// return response;
+// }
+//
+//
+//
+// private static String readBufferedContent(BufferedReader bufferedReader) {
+// if (bufferedReader == null)
+// return null;
+// StringBuffer result = new StringBuffer();
+// String line = null;
+// try {
+// while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) {
+// result.append(line+"\r\n");
+// }
+// } catch (IOException e) {
+// e.printStackTrace();
+// return null;
+// }
+// return result.toString();
+// }
+//
+//}