You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by li...@apache.org on 2018/04/26 08:12:45 UTC

[22/50] [abbrv] incubator-griffin git commit: batch pass

batch pass


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ada8a0d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ada8a0d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ada8a0d4

Branch: refs/heads/griffin-0.2.0-incubating-rc4
Commit: ada8a0d4b02a05793b220998cddeb1de5179d80d
Parents: 07fcb29
Author: Lionel Liu <bh...@163.com>
Authored: Mon Apr 16 22:07:09 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Apr 16 22:07:09 2018 +0800

----------------------------------------------------------------------
 measure/pom.xml                                 |   2 +-
 .../data/source/cache/DataSourceCache.scala     |   2 +-
 .../data/source/cache/JsonDataSourceCache.scala |   4 +-
 .../data/source/cache/OrcDataSourceCache.scala  |   4 +-
 .../source/cache/ParquetDataSourceCache.scala   |   4 +-
 .../griffin/measure/persist/MultiPersists.scala |   2 +-
 .../griffin/measure/persist/Persist.scala       |   2 +-
 .../measure/process/BatchDqProcess.scala        |  27 +-
 .../measure/process/StreamingDqProcess.scala    |  26 +-
 .../process/engine/DataFrameOprEngine.scala     |  10 +-
 .../measure/process/engine/DqEngine.scala       |   2 +-
 .../measure/process/engine/DqEngines.scala      |   2 +-
 .../measure/process/engine/SparkDqEngine.scala  |   9 +-
 .../measure/process/engine/SparkSqlEngine.scala |   2 +-
 .../measure/rule/adaptor/RuleAdaptorGroup.scala |  17 +-
 .../griffin/measure/util/MessageUtil.java       | 320 +++++++++----------
 16 files changed, 233 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/pom.xml
----------------------------------------------------------------------
diff --git a/measure/pom.xml b/measure/pom.xml
index 42fc967..2dffd35 100644
--- a/measure/pom.xml
+++ b/measure/pom.xml
@@ -117,7 +117,7 @@ under the License.
     <dependency>
       <groupId>com.databricks</groupId>
       <artifactId>spark-avro_${scala.binary.version}</artifactId>
-      <version>2.0.1</version>
+      <version>4.0.0</version>
     </dependency>
     <!--csv-->
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
index 2412130..f70bd11 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/DataSourceCache.scala
@@ -98,7 +98,7 @@ trait DataSourceCache extends DataCacheable with WithFanIn[Long] with Loggable w
 
   val defOldCacheIndex = 0L
 
-  protected def writeDataFrame(dfw: DataFrameWriter, path: String): Unit
+  protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
   protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
 
   def init(): Unit = {}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
index e284d47..2fa5316 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/JsonDataSourceCache.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.data.source.cache
 
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext}
+import org.apache.spark.sql._
 
 case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
                                dsName: String, index: Int
@@ -28,7 +28,7 @@ case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
 //    sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
   }
 
-  def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+  def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
     println(s"write path: ${path}")
     dfw.json(path)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
index 7b92bef..5bf2500 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/OrcDataSourceCache.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.data.source.cache
 
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext}
+import org.apache.spark.sql._
 
 case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
                               dsName: String, index: Int
@@ -28,7 +28,7 @@ case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
 //    sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false");
   }
 
-  def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+  def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
     println(s"write path: ${path}")
     dfw.orc(path)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
index 89cd0b7..f39d832 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/source/cache/ParquetDataSourceCache.scala
@@ -18,7 +18,7 @@ under the License.
 */
 package org.apache.griffin.measure.data.source.cache
 
-import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, SQLContext}
+import org.apache.spark.sql._
 
 case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
                                   dsName: String, index: Int
@@ -28,7 +28,7 @@ case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any
     sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
   }
 
-  def writeDataFrame(dfw: DataFrameWriter, path: String): Unit = {
+  def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
     println(s"write path: ${path}")
     dfw.parquet(path)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
index bed28fd..4b30cba 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/MultiPersists.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.persist
 import org.apache.griffin.measure.result._
 import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Dataset}
 
 import scala.util.Try
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
index 361fad7..300f2c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/persist/Persist.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.persist
 import org.apache.griffin.measure.log.Loggable
 import org.apache.griffin.measure.result._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.{DataFrame, Dataset}
 
 import scala.util.Try
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
index 2770de8..04b608a 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/BatchDqProcess.scala
@@ -30,9 +30,8 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters,
 import org.apache.griffin.measure.rule.adaptor._
 import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.rule.udf._
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.{SQLContext, SparkSession}
+import org.apache.spark.SparkConf
 
 import scala.util.Try
 
@@ -46,24 +45,31 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
   val dataSourceNames = userParam.dataSources.map(_.name)
   val baselineDsName = userParam.baselineDsName
 
-  var sparkContext: SparkContext = _
+//  var sparkContext: SparkContext = _
   var sqlContext: SQLContext = _
 
+  var sparkSession: SparkSession = _
+
   def retriable: Boolean = false
 
   def init: Try[_] = Try {
     val conf = new SparkConf().setAppName(metricName)
     conf.setAll(sparkParam.config)
-    sparkContext = new SparkContext(conf)
-    sparkContext.setLogLevel(sparkParam.logLevel)
-    sqlContext = new HiveContext(sparkContext)
+    conf.set("spark.sql.crossJoin.enabled", "true")
+    sparkSession = if (conf.contains("hive.metastore.uris")) {
+      SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+    } else {
+      SparkSession.builder().config(conf).getOrCreate()
+    }
+    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+    sqlContext = sparkSession.sqlContext
 
     // register udf
     GriffinUdfs.register(sqlContext)
     GriffinUdafs.register(sqlContext)
 
     // init adaptors
-    RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
+    RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName)
   }
 
   def run: Try[_] = Try {
@@ -78,7 +84,7 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     val persist: Persist = persistFactory.getPersists(appTime)
 
     // persist start id
-    val applicationId = sparkContext.applicationId
+    val applicationId = sparkSession.sparkContext.applicationId
     persist.start(applicationId)
 
     // get dq engines
@@ -146,7 +152,8 @@ case class BatchDqProcess(allParam: AllParam) extends DqProcess {
     DataFrameCaches.uncacheGlobalDataFrames()
     DataFrameCaches.clearGlobalTrashDataFrames()
 
-    sparkContext.stop
+    sparkSession.close()
+    sparkSession.stop()
   }
 
 //  private def cleanData(t: Long): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
index b2af46a..b7ec449 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/StreamingDqProcess.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.process.temp.{DataFrameCaches, TableRegisters}
 import org.apache.griffin.measure.rule.adaptor.RuleAdaptorGroup
 import org.apache.griffin.measure.rule.udf._
 import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{SQLContext, SparkSession}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.streaming.{Milliseconds, StreamingContext}
 import org.apache.spark.{SparkConf, SparkContext}
@@ -46,17 +46,24 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
   val dataSourceNames = userParam.dataSources.map(_.name)
   val baselineDsName = userParam.baselineDsName
 
-  var sparkContext: SparkContext = _
+//  var sparkContext: SparkContext = _
   var sqlContext: SQLContext = _
 
+  var sparkSession: SparkSession = _
+
   def retriable: Boolean = true
 
   def init: Try[_] = Try {
     val conf = new SparkConf().setAppName(metricName)
     conf.setAll(sparkParam.config)
-    sparkContext = new SparkContext(conf)
-    sparkContext.setLogLevel(sparkParam.logLevel)
-    sqlContext = new HiveContext(sparkContext)
+    conf.set("spark.sql.crossJoin.enabled", "true")
+    sparkSession = if (conf.contains("hive.metastore.uris")) {
+      SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
+    } else {
+      SparkSession.builder().config(conf).getOrCreate()
+    }
+    sparkSession.sparkContext.setLogLevel(sparkParam.logLevel)
+    sqlContext = sparkSession.sqlContext
 
     // clear checkpoint directory
     clearCpDir
@@ -71,7 +78,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
 
     // init adaptors
     val dataSourceNames = userParam.dataSources.map(_.name)
-    RuleAdaptorGroup.init(sqlContext, dataSourceNames, baselineDsName)
+    RuleAdaptorGroup.init(sparkSession, dataSourceNames, baselineDsName)
   }
 
   def run: Try[_] = Try {
@@ -94,7 +101,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
     val persist: Persist = persistFactory.getPersists(appTime)
 
     // persist start id
-    val applicationId = sparkContext.applicationId
+    val applicationId = sparkSession.sparkContext.applicationId
     persist.start(applicationId)
 
     // get dq engines
@@ -149,7 +156,8 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
     DataFrameCaches.uncacheGlobalDataFrames()
     DataFrameCaches.clearGlobalTrashDataFrames()
 
-    sparkContext.stop
+    sparkSession.close()
+    sparkSession.stop()
 
     InfoCacheInstance.close
   }
@@ -159,7 +167,7 @@ case class StreamingDqProcess(allParam: AllParam) extends DqProcess {
       case Some(interval) => Milliseconds(interval)
       case _ => throw new Exception("invalid batch interval")
     }
-    val ssc = new StreamingContext(sparkContext, batchInterval)
+    val ssc = new StreamingContext(sparkSession.sparkContext, batchInterval)
     ssc.checkpoint(sparkParam.cpDir)
 
     ssc

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
index 59b765e..600da45 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DataFrameOprEngine.scala
@@ -38,6 +38,8 @@ import org.apache.griffin.measure.utils.ParamUtil._
 
 import scala.util.Try
 
+import org.apache.spark.sql._
+
 case class DataFrameOprEngine(sqlContext: SQLContext) extends SparkDqEngine {
 
   def runRuleStep(timeInfo: TimeInfo, ruleStep: RuleStep): Boolean = {
@@ -85,9 +87,11 @@ object DataFrameOprs {
     val dfName = details.getOrElse(_dfName, "").toString
     val colNameOpt = details.get(_colName).map(_.toString)
 
-    val df = sqlContext.table(s"`${dfName}`")
+    implicit val encoder = Encoders.STRING
+
+    val df: DataFrame = sqlContext.table(s"`${dfName}`")
     val rdd = colNameOpt match {
-      case Some(colName: String) => df.map(_.getAs[String](colName))
+      case Some(colName: String) => df.map(r => r.getAs[String](colName))
       case _ => df.map(_.getAs[String](0))
     }
     sqlContext.read.json(rdd) // slow process
@@ -116,6 +120,8 @@ object DataFrameOprs {
       }
     }
 
+    implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.bean(classOf[AccuracyResult]))
+
     val df = sqlContext.table(s"`${dfName}`")
 
     val results = df.flatMap { row =>

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
index 3d77458..a2de69d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngine.scala
@@ -26,7 +26,7 @@ import org.apache.griffin.measure.process.ProcessType
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan.{TimeInfo, _}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 
 trait DqEngine extends Loggable with Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
index 6b9a215..a3ef7dc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/DqEngines.scala
@@ -31,7 +31,7 @@ import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan.{DsUpdate, _}
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 
 //import scala.concurrent._
 //import scala.concurrent.duration.Duration

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
index 382e302..4ba185f 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkDqEngine.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql._
 import org.apache.griffin.measure.utils.ParamUtil._
 
 trait SparkDqEngine extends DqEngine {
@@ -115,10 +115,11 @@ trait SparkDqEngine extends DqEngine {
   }
 
   def collectBatchRecords(recordExport: RecordExport): Option[RDD[String]] = {
-    getRecordDataFrame(recordExport).map(_.toJSON)
+    getRecordDataFrame(recordExport).map(_.toJSON.rdd)
   }
 
   def collectStreamingRecords(recordExport: RecordExport): (Option[RDD[(Long, Iterable[String])]], Set[Long]) = {
+    implicit val encoder = Encoders.tuple(Encoders.scalaLong, Encoders.STRING)
     val RecordExport(_, _, _, originDFOpt, defTmst, procType) = recordExport
     getRecordDataFrame(recordExport) match {
       case Some(stepDf) => {
@@ -149,7 +150,7 @@ trait SparkDqEngine extends DqEngine {
                   }
                 } else None
               }
-              (Some(records.groupByKey), emptyTmsts)
+              (Some(records.rdd.groupByKey), emptyTmsts)
             } else (None, emptyTmsts)
           }
           case _ => {
@@ -163,7 +164,7 @@ trait SparkDqEngine extends DqEngine {
                 case e: Throwable => None
               }
             }
-            (Some(records.groupByKey), Set[Long]())
+            (Some(records.rdd.groupByKey), Set[Long]())
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
index 438595b..e4ecb49 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/process/engine/SparkSqlEngine.scala
@@ -29,7 +29,7 @@ import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
 import org.apache.griffin.measure.utils.JsonUtil
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, GroupedData, SQLContext}
+import org.apache.spark.sql.SQLContext
 import org.apache.spark.streaming.StreamingContext
 
 case class SparkSqlEngine(sqlContext: SQLContext) extends SparkDqEngine {

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
index 30a356c..5b5f419 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/rule/adaptor/RuleAdaptorGroup.scala
@@ -24,7 +24,7 @@ import org.apache.griffin.measure.process.ProcessType
 import org.apache.griffin.measure.process.temp.{TableRegisters, TimeRange}
 import org.apache.griffin.measure.rule.dsl._
 import org.apache.griffin.measure.rule.plan._
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
 
 import scala.collection.mutable.{Map => MutableMap}
 
@@ -46,14 +46,23 @@ object RuleAdaptorGroup {
     functionNames = funcNames
   }
 
-  def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = {
-    val functions = sqlContext.sql("show functions")
-    functionNames = functions.map(_.getString(0)).collect.toSeq
+  def init(sparkSession: SparkSession, dsNames: Seq[String], blDsName: String): Unit = {
+    implicit val encoder = Encoders.STRING
+    val functions = sparkSession.catalog.listFunctions
+    functionNames = functions.map(_.name).collect.toSeq
     dataSourceNames = dsNames
 
     baselineDsName = blDsName
   }
 
+//  def init(sqlContext: SQLContext, dsNames: Seq[String], blDsName: String): Unit = {
+  //    val functions = sqlContext.sql("show functions")
+  //    functionNames = functions.map(_.getString(0)).collect.toSeq
+  //    dataSourceNames = dsNames
+  //
+  //    baselineDsName = blDsName
+  //  }
+
   private def getDslType(param: Map[String, Any], defDslType: DslType) = {
     DslType(param.getOrElse(_dslType, defDslType.desc).toString)
   }

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ada8a0d4/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
index 6fba5ff..4f15b62 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
+++ b/measure/src/main/scala/org/apache/griffin/measure/util/MessageUtil.java
@@ -16,163 +16,163 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
-package org.apache.griffin.measure.util;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.StringUtils;
-import java.util.ArrayList;
-import java.io.*;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.*;
-import java.util.*;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-import org.apache.griffin.measure.config.params.env.SMSParam;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Created by xiaoqiu.duan on 2017/9/11.
- */
-public class MessageUtil {
-
-
-    public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){
-        String url=smsParam.host();
-        String SYS_ID=smsParam.id();
-        String KEY=smsParam.key();
-        String sendContext="["+smsParam.uuid()+"]: "+content ;
-        System.out.println(" sendContext:  "+sendContext);
-        Long timestamp=new Date().getTime()+20610;
-        System.out.println(" timestamp:  "+timestamp);
-        String[]  tels=teamPhone.split(",") ;
-        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
-        Map<String,Object> param=new HashMap<String, Object>();
-        param.put("apportionment","JR_DATA_ALARM");
-        param.put("event",0);
-        param.put("eventTime",0);
-        param.put("isMasked",false);
-        param.put("originator","ETC");
-        param.put("reqId",uuid);
-        param.put("schemaKey","");
-        param.put("sysId",SYS_ID);
-        param.put("template",sendContext);
-        param.put("templateId","");
-        param.put("timestamp",timestamp);
-        param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY));
-        param.put("typeCode","JR_DATA_ALARM");
-        System.out.println("params:  "+param);
-        List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>();
-          for (int i=0;i<tels.length;i++) {
-            Map<String, Object> body = new HashMap<String, Object>();
-            body.put("phoneNo", tels[i]);
-            body.put("params", "");
-            body.put("userId", 0);
-            bodys.add(body);
-        }
-        System.out.println("bodys:  "+bodys);
-        JSONObject jsonParam = new JSONObject();
-        try {
-            jsonParam.put("params",param);
-            jsonParam.put("bodys",bodys);
-            System.out.println("jsonParam:  "+jsonParam);
-            System.out.println("jsonParam:  "+jsonParam.toString());
-        } catch (JSONException e) {
-            e.printStackTrace();
-        }
-        URL u=null;
-        int smsnum=0;
-        HttpURLConnection connection=null;
-        try{
-           String result= postRequestUrl(url, jsonParam.toString(), "utf-8");
-            return "send success";
-        }catch(Exception e){
-            e.printStackTrace();
-            return null;
-        }
-
-    }
-
-
-    public static String postRequestUrl(String url, String param,String encode) {
-        OutputStreamWriter out = null;
-        BufferedReader reader = null;
-        String response="";
-        try {
-            URL httpUrl = null;
-            httpUrl = new URL(url);
-            HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection();
-            conn.setRequestMethod("POST");
-            conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded
-            conn.setRequestProperty("connection", "keep-alive");
-            conn.setUseCaches(false);
-            conn.setInstanceFollowRedirects(true);
-            conn.setDoOutput(true);
-            conn.setDoInput(true);
-            conn.connect();
-            //POST
-            out = new OutputStreamWriter(
-                    conn.getOutputStream());
-            out.write(param);
-
-            out.flush();
-
-            System.out.println("send POST "+conn.getResponseCode());
-            reader = new BufferedReader(new InputStreamReader(
-                    conn.getInputStream()));
-
-            String lines;
-            while ((lines = reader.readLine()) != null) {
-                lines = new String(lines.getBytes(), "utf-8");
-                response+=lines;
-                System.out.println("lines:  "+lines);
-            }
-            reader.close();
-            conn.disconnect();
-
-            //log.info(response.toString());
-        } catch (Exception e) {
-            System.out.println("send POST error!"+e);
-            e.printStackTrace();
-        }
-        finally{
-            try{
-                if(out!=null){
-                    out.close();
-                }
-                if(reader!=null){
-                    reader.close();
-                }
-            }
-            catch(IOException ex){
-                ex.printStackTrace();
-            }
-        }
-
-        return response;
-    }
-
-
-
-    private static String readBufferedContent(BufferedReader bufferedReader) {
-        if (bufferedReader == null)
-            return null;
-        StringBuffer result = new StringBuffer();
-        String line = null;
-        try {
-            while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) {
-                result.append(line+"\r\n");
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-            return null;
-        }
-        return result.toString();
-    }
-
-}
+//package org.apache.griffin.measure.util;
+//
+//import java.io.IOException;
+//
+//import org.apache.commons.lang.StringUtils;
+//import java.util.ArrayList;
+//import java.io.*;
+//import java.io.BufferedReader;
+//import java.io.InputStreamReader;
+//import java.net.*;
+//import java.util.*;
+//import java.util.HashMap;
+//import java.util.List;
+//import java.util.Map;
+//
+//
+//import org.apache.griffin.measure.config.params.env.SMSParam;
+//import org.json.JSONException;
+//import org.json.JSONObject;
+//
+///**
+// * Created by xiaoqiu.duan on 2017/9/11.
+// */
+//public class MessageUtil {
+//
+//
+//    public static String sendSMSCode(String teamPhone, String content, SMSParam smsParam){
+//        String url=smsParam.host();
+//        String SYS_ID=smsParam.id();
+//        String KEY=smsParam.key();
+//        String sendContext="["+smsParam.uuid()+"]: "+content ;
+//        System.out.println(" sendContext:  "+sendContext);
+//        Long timestamp=new Date().getTime()+20610;
+//        System.out.println(" timestamp:  "+timestamp);
+//        String[]  tels=teamPhone.split(",") ;
+//        String uuid = UUID.randomUUID().toString().replaceAll("-", "");
+//        Map<String,Object> param=new HashMap<String, Object>();
+//        param.put("apportionment","JR_DATA_ALARM");
+//        param.put("event",0);
+//        param.put("eventTime",0);
+//        param.put("isMasked",false);
+//        param.put("originator","ETC");
+//        param.put("reqId",uuid);
+//        param.put("schemaKey","");
+//        param.put("sysId",SYS_ID);
+//        param.put("template",sendContext);
+//        param.put("templateId","");
+//        param.put("timestamp",timestamp);
+//        param.put("token",Md5Util.md5(SYS_ID+timestamp+KEY));
+//        param.put("typeCode","JR_DATA_ALARM");
+//        System.out.println("params:  "+param);
+//        List<Map<String, Object>> bodys = new ArrayList<Map<String, Object>>();
+//          for (int i=0;i<tels.length;i++) {
+//            Map<String, Object> body = new HashMap<String, Object>();
+//            body.put("phoneNo", tels[i]);
+//            body.put("params", "");
+//            body.put("userId", 0);
+//            bodys.add(body);
+//        }
+//        System.out.println("bodys:  "+bodys);
+//        JSONObject jsonParam = new JSONObject();
+//        try {
+//            jsonParam.put("params",param);
+//            jsonParam.put("bodys",bodys);
+//            System.out.println("jsonParam:  "+jsonParam);
+//            System.out.println("jsonParam:  "+jsonParam.toString());
+//        } catch (JSONException e) {
+//            e.printStackTrace();
+//        }
+//        URL u=null;
+//        int smsnum=0;
+//        HttpURLConnection connection=null;
+//        try{
+//           String result= postRequestUrl(url, jsonParam.toString(), "utf-8");
+//            return "send success";
+//        }catch(Exception e){
+//            e.printStackTrace();
+//            return null;
+//        }
+//
+//    }
+//
+//
+//    public static String postRequestUrl(String url, String param,String encode) {
+//        OutputStreamWriter out = null;
+//        BufferedReader reader = null;
+//        String response="";
+//        try {
+//            URL httpUrl = null;
+//            httpUrl = new URL(url);
+//            HttpURLConnection conn = (HttpURLConnection) httpUrl.openConnection();
+//            conn.setRequestMethod("POST");
+//            conn.setRequestProperty("Content-Type", "application/json");//x-www-form-urlencoded
+//            conn.setRequestProperty("connection", "keep-alive");
+//            conn.setUseCaches(false);
+//            conn.setInstanceFollowRedirects(true);
+//            conn.setDoOutput(true);
+//            conn.setDoInput(true);
+//            conn.connect();
+//            //POST
+//            out = new OutputStreamWriter(
+//                    conn.getOutputStream());
+//            out.write(param);
+//
+//            out.flush();
+//
+//            System.out.println("send POST "+conn.getResponseCode());
+//            reader = new BufferedReader(new InputStreamReader(
+//                    conn.getInputStream()));
+//
+//            String lines;
+//            while ((lines = reader.readLine()) != null) {
+//                lines = new String(lines.getBytes(), "utf-8");
+//                response+=lines;
+//                System.out.println("lines:  "+lines);
+//            }
+//            reader.close();
+//            conn.disconnect();
+//
+//            //log.info(response.toString());
+//        } catch (Exception e) {
+//            System.out.println("send POST error!"+e);
+//            e.printStackTrace();
+//        }
+//        finally{
+//            try{
+//                if(out!=null){
+//                    out.close();
+//                }
+//                if(reader!=null){
+//                    reader.close();
+//                }
+//            }
+//            catch(IOException ex){
+//                ex.printStackTrace();
+//            }
+//        }
+//
+//        return response;
+//    }
+//
+//
+//
+//    private static String readBufferedContent(BufferedReader bufferedReader) {
+//        if (bufferedReader == null)
+//            return null;
+//        StringBuffer result = new StringBuffer();
+//        String line = null;
+//        try {
+//            while (StringUtils.isNotBlank((line = bufferedReader.readLine()))) {
+//                result.append(line+"\r\n");
+//            }
+//        } catch (IOException e) {
+//            e.printStackTrace();
+//            return null;
+//        }
+//        return result.toString();
+//    }
+//
+//}