You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/11 07:58:45 UTC
incubator-griffin git commit: refactor main process of batch and
streaming measure
Repository: incubator-griffin
Updated Branches:
refs/heads/master ff3098ffd -> ed5ac8730
refactor main process of batch and streaming measure
Author: Lionel Liu <bh...@163.com>
Author: dodobel <12...@qq.com>
Closes #294 from bhlx3lyx7/spark2.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/ed5ac873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/ed5ac873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/ed5ac873
Branch: refs/heads/master
Commit: ed5ac8730e685b54c38494c26bf07c0603e8fa67
Parents: ff3098f
Author: Lionel Liu <bh...@163.com>
Authored: Mon Jun 11 15:58:39 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Mon Jun 11 15:58:39 2018 +0800
----------------------------------------------------------------------
.../apache/griffin/measure/Application.scala | 16 +-
.../griffin/measure/context/DQContext.scala | 2 +
.../measure/context/datasource/DataSource.scala | 14 +-
.../context/datasource/DataSourceFactory.scala | 9 +-
.../datasource/cache/DataSourceCache.scala | 366 -------------------
.../cache/DataSourceCacheFactory.scala | 68 ----
.../datasource/cache/JsonDataSourceCache.scala | 40 --
.../datasource/cache/OrcDataSourceCache.scala | 40 --
.../cache/ParquetDataSourceCache.scala | 42 ---
.../datasource/cache/StreamingCacheClient.scala | 366 +++++++++++++++++++
.../cache/StreamingCacheClientFactory.scala | 68 ++++
.../cache/StreamingCacheJsonClient.scala | 40 ++
.../cache/StreamingCacheOrcClient.scala | 40 ++
.../cache/StreamingCacheParquetClient.scala | 42 +++
.../connector/DataConnectorFactory.scala | 16 +-
.../streaming/KafkaStreamingDataConnector.scala | 4 +-
.../KafkaStreamingStringDataConnector.scala | 4 +-
.../streaming/StreamingDataConnector.scala | 4 +-
.../apache/griffin/measure/launch/DQApp.scala | 5 +-
.../measure/launch/batch/BatchDQApp.scala | 6 +-
.../launch/streaming/StreamingDQApp.scala | 104 +++++-
.../launch/streaming/StreamingDQApp2.scala | 104 ------
.../step/builder/RuleParamStepBuilder.scala | 4 +-
.../dsl/transform/AccuracyExpr2DQSteps.scala | 4 +-
.../transform/DistinctnessExpr2DQSteps.scala | 4 +-
.../step/write/DataSourceUpdateWriteStep.scala | 61 ++++
.../step/write/DsCacheUpdateWriteStep.scala | 61 ----
measure/src/test/resources/env-batch.json | 9 -
28 files changed, 763 insertions(+), 780 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/Application.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/Application.scala b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
index 893ba2c..6580ffc 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/Application.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/Application.scala
@@ -77,7 +77,7 @@ object Application extends Loggable {
// choose process
val procType = ProcessType(allParam.dqParam.procType)
- val proc: DQApp = procType match {
+ val dqApp: DQApp = procType match {
case BatchProcessType => BatchDQApp(allParam)
case StreamingProcessType => StreamingDQApp(allParam)
case _ => {
@@ -88,8 +88,8 @@ object Application extends Loggable {
startup
- // process init
- proc.init match {
+ // dq app init
+ dqApp.init match {
case Success(_) => {
info("process init success")
}
@@ -100,15 +100,15 @@ object Application extends Loggable {
}
}
- // process run
- proc.run match {
+ // dq app run
+ dqApp.run match {
case Success(_) => {
info("process run success")
}
case Failure(ex) => {
error(s"process run error: ${ex.getMessage}")
- if (proc.retryable) {
+ if (dqApp.retryable) {
throw ex
} else {
shutdown
@@ -117,8 +117,8 @@ object Application extends Loggable {
}
}
- // process end
- proc.close match {
+ // dq app end
+ dqApp.close match {
case Success(_) => {
info("process end success")
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 43b61aa..1d7dc62 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -26,6 +26,8 @@ import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
/**
* dq context: the context of each calculation
+ * unique context id in each calculation
+ * access the same spark session this app created
*/
case class DQContext(contextId: ContextId,
name: String,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
index 09ab9ea..0ca61aa 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
import org.apache.griffin.measure.context.datasource.connector.DataConnector
import org.apache.griffin.measure.context.datasource.info.TmstCache
@@ -32,12 +32,12 @@ import org.apache.spark.sql._
* @param name name of data source
* @param dsParam param of this data source
* @param dataConnectors list of data connectors
- * @param dataSourceCacheOpt data source cache option in streaming mode
+ * @param streamingCacheClientOpt streaming data cache client option
*/
case class DataSource(name: String,
dsParam: DataSourceParam,
dataConnectors: Seq[DataConnector],
- dataSourceCacheOpt: Option[DataSourceCache]
+ streamingCacheClientOpt: Option[StreamingCacheClient]
) extends Loggable with Serializable {
def init(): Unit = {
@@ -67,7 +67,7 @@ case class DataSource(name: String,
case _ => None
}
}
- val caches = dataSourceCacheOpt match {
+ val caches = streamingCacheClientOpt match {
case Some(dsc) => dsc.readData() :: Nil
case _ => Nil
}
@@ -83,15 +83,15 @@ case class DataSource(name: String,
}
def updateData(df: DataFrame): Unit = {
- dataSourceCacheOpt.foreach(_.updateData(Some(df)))
+ streamingCacheClientOpt.foreach(_.updateData(Some(df)))
}
def cleanOldData(): Unit = {
- dataSourceCacheOpt.foreach(_.cleanOutTimeData)
+ streamingCacheClientOpt.foreach(_.cleanOutTimeData)
}
def processFinish(): Unit = {
- dataSourceCacheOpt.foreach(_.processFinish)
+ streamingCacheClientOpt.foreach(_.processFinish)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
index a22b856..95c8de7 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCacheFactory
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClientFactory
import org.apache.griffin.measure.context.datasource.connector.{DataConnector, DataConnectorFactory}
import org.apache.griffin.measure.context.datasource.info.TmstCache
import org.apache.spark.sql.SparkSession
@@ -49,18 +49,19 @@ object DataSourceFactory extends Loggable {
val connectorParams = dataSourceParam.getConnectors
val tmstCache = TmstCache()
- val dataSourceCacheOpt = DataSourceCacheFactory.getDataSourceCacheOpt(
+ // for streaming data cache
+ val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
sparkSession.sqlContext, dataSourceParam.cache, name, index, tmstCache)
val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
- tmstCache, dataSourceCacheOpt) match {
+ tmstCache, streamingCacheClientOpt) match {
case Success(connector) => Some(connector)
case _ => None
}
}
- Some(DataSource(name, dataSourceParam, dataConnectors, dataSourceCacheOpt))
+ Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
deleted file mode 100644
index c70fd20..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCache.scala
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache}
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
-
-import scala.util.Random
-
-/**
- * data source cache in streaming mode
- * save data frame into hdfs in dump phase
- * read data frame from hdfs in calculate phase
- * with update and clean actions for the cache data
- */
-trait DataSourceCache extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable {
-
- val sqlContext: SQLContext
- val param: Map[String, Any]
- val dsName: String
- val index: Int
-
- val tmstCache: TmstCache
- protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until)
- protected def clearTmst(t: Long) = tmstCache.remove(t)
- protected def clearTmstsUntil(until: Long) = {
- val outDateTmsts = tmstCache.until(until)
- tmstCache.remove(outDateTmsts)
- }
- protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
- protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
-
- val _FilePath = "file.path"
- val _InfoPath = "info.path"
- val _ReadyTimeInterval = "ready.time.interval"
- val _ReadyTimeDelay = "ready.time.delay"
- val _TimeRange = "time.range"
-
- val rdmStr = Random.alphanumeric.take(10).mkString
- val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
- val defInfoPath = s"${index}"
-
- val filePath: String = param.getString(_FilePath, defFilePath)
- val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
- val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
- val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
- val deltaTimeRange: (Long, Long) = {
- def negative(n: Long): Long = if (n <= 0) n else 0
- param.get(_TimeRange) match {
- case Some(seq: Seq[String]) => {
- val nseq = seq.flatMap(TimeUtil.milliseconds(_))
- val ns = negative(nseq.headOption.getOrElse(0))
- val ne = negative(nseq.tail.headOption.getOrElse(0))
- (ns, ne)
- }
- case _ => (0, 0)
- }
- }
-
- val _ReadOnly = "read.only"
- val readOnly = param.getBoolean(_ReadOnly, false)
-
- val _Updatable = "updatable"
- val updatable = param.getBoolean(_Updatable, false)
-
- val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
- val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
- val newFilePath = s"${filePath}/new"
- val oldFilePath = s"${filePath}/old"
-
- val defOldCacheIndex = 0L
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
-
- /**
- * save data frame in dump phase
- * @param dfOpt data frame to be saved
- * @param ms timestamp of this data frame
- */
- def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
- if (!readOnly) {
- dfOpt match {
- case Some(df) => {
- // cache df
- df.cache
-
- // cache df
- val cnt = df.count
- info(s"save ${dsName} data count: ${cnt}")
-
- if (cnt > 0) {
- // lock makes it safer when writing new cache data
- val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
- if (newCacheLocked) {
- try {
- val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
- writeDataFrame(dfw, newFilePath)
- } catch {
- case e: Throwable => error(s"save data error: ${e.getMessage}")
- } finally {
- newCacheLock.unlock()
- }
- }
- }
-
- // uncache df
- df.unpersist
- }
- case _ => {
- info(s"no data frame to save")
- }
- }
-
- // submit cache time and ready time
- if (fanIncrement(ms)) {
- info(s"save data [${ms}] finish")
- submitCacheTime(ms)
- submitReadyTime(ms)
- }
-
- }
- }
-
- /**
- * read data frame in calculation phase
- * @return data frame to calculate, with the time range of data
- */
- def readData(): (Option[DataFrame], TimeRange) = {
- // time range: (a, b]
- val timeRange = TimeInfoCache.getTimeRange
- val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-
- // read partition info
- val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
- info(s"read time range: [${reviseTimeRange._1}]")
- s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
- } else {
- info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
- s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
- }
-
- // new cache data
- val newDfOpt = try {
- val dfr = sqlContext.read
- Some(readDataFrame(dfr, newFilePath).filter(filterStr))
- } catch {
- case e: Throwable => {
- warn(s"read data source cache warn: ${e.getMessage}")
- None
- }
- }
-
- // old cache data
- val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
- val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
- val oldDfPath = s"${oldFilePath}/${idx}"
- try {
- val dfr = sqlContext.read
- Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
- } catch {
- case e: Throwable => {
- warn(s"read old data source cache warn: ${e.getMessage}")
- None
- }
- }
- }
-
- // whole cache data
- val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
-
- // from until tmst range
- val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
- val tmstSet = afterTilRangeTmsts(from, until)
-
- val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
- (cacheDfOpt, retTimeRange)
- }
-
- private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
- func: (Long, Long) => Boolean
- ): Unit = {
- val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
- // delete out time data path
- earlierOrEqPaths.foreach { path =>
- info(s"delete hdfs path: ${path}")
- HdfsUtil.deleteHdfsPath(path)
- }
- }
- private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
- func: (Long, Long) => Boolean
- ): Iterable[String] = {
- val names = HdfsUtil.listSubPathsByType(path, "dir")
- val regex = partitionOpt match {
- case Some(partition) => s"^${partition}=(\\d+)$$".r
- case _ => "^(\\d+)$".r
- }
- names.filter { name =>
- name match {
- case regex(value) => {
- str2Long(value) match {
- case Some(t) => func(t, bound)
- case _ => false
- }
- }
- case _ => false
- }
- }.map(name => s"${path}/${name}")
- }
- private def str2Long(str: String): Option[Long] = {
- try {
- Some(str.toLong)
- } catch {
- case e: Throwable => None
- }
- }
-
- /**
- * clean out-time cached data on hdfs
- */
- def cleanOutTimeData(): Unit = {
- // clean tmst
- val cleanTime = readCleanTime
- cleanTime.foreach(clearTmstsTil(_))
-
- if (!readOnly) {
- // new cache data
- val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
- newCacheCleanTime match {
- case Some(nct) => {
- // clean calculated new cache data
- val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
- if (newCacheLocked) {
- try {
- cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
- (a: Long, b: Long) => (a <= b))
- } catch {
- case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
- } finally {
- newCacheLock.unlock()
- }
- }
- }
- case _ => {
- // do nothing
- }
- }
-
- // old cache data
- val oldCacheCleanTime = if (updatable) readCleanTime else None
- oldCacheCleanTime match {
- case Some(oct) => {
- val oldCacheIndexOpt = readOldCacheIndex
- oldCacheIndexOpt.foreach { idx =>
- val oldDfPath = s"${oldFilePath}/${idx}"
- val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
- if (oldCacheLocked) {
- try {
- // clean calculated old cache data
- cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
- // clean out time old cache data not calculated
-// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
- } catch {
- case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
- } finally {
- oldCacheLock.unlock()
- }
- }
- }
- }
- case _ => {
- // do nothing
- }
- }
- }
- }
-
- /**
- * update old cached data by new data frame
- * @param dfOpt data frame to update old cached data
- */
- def updateData(dfOpt: Option[DataFrame]): Unit = {
- if (!readOnly && updatable) {
- dfOpt match {
- case Some(df) => {
- // old cache lock
- val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
- if (oldCacheLocked) {
- try {
- val oldCacheIndexOpt = readOldCacheIndex
- val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
-
- val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
- val cleanTime = getNextCleanTime
- val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
- val updateDf = df.filter(filterStr)
-
- val prlCount = sqlContext.sparkContext.defaultParallelism
- // repartition
- val repartitionedDf = updateDf.repartition(prlCount)
- val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
- writeDataFrame(dfw, oldDfPath)
-
- submitOldCacheIndex(nextOldCacheIndex)
- } catch {
- case e: Throwable => error(s"update data error: ${e.getMessage}")
- } finally {
- oldCacheLock.unlock()
- }
- }
- }
- case _ => {
- info(s"no data frame to update")
- }
- }
- }
- }
-
- /**
- * each time calculation phase finishes,
- * data source cache needs to submit some cache information
- */
- def processFinish(): Unit = {
- // next last proc time
- val timeRange = TimeInfoCache.getTimeRange
- submitLastProcTime(timeRange._2)
-
- // next clean time
- val nextCleanTime = timeRange._2 + deltaTimeRange._1
- submitCleanTime(nextCleanTime)
- }
-
- // read next clean time
- private def getNextCleanTime(): Long = {
- val timeRange = TimeInfoCache.getTimeRange
- val nextCleanTime = timeRange._2 + deltaTimeRange._1
- nextCleanTime
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
deleted file mode 100644
index ca882e0..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/DataSourceCacheFactory.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
-
-object DataSourceCacheFactory extends Loggable {
-
- private object DataSourceCacheType {
- val ParquetRegex = "^(?i)parq(uet)?$".r
- val JsonRegex = "^(?i)json$".r
- val OrcRegex = "^(?i)orc$".r
- }
- import DataSourceCacheType._
-
- val _type = "type"
-
- /**
- * create data source cache
- * @param sqlContext sqlContext in spark environment
- * @param param data source cache config
- * @param name data source name
- * @param index data source index
- * @param tmstCache the same tmstCache instance inside a data source
- * @return data source option
- */
- def getDataSourceCacheOpt(sqlContext: SQLContext, param: Map[String, Any],
- name: String, index: Int, tmstCache: TmstCache
- ): Option[DataSourceCache] = {
- if (param != null) {
- try {
- val tp = param.getString(_type, "")
- val dsCache = tp match {
- case ParquetRegex() => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache)
- case JsonRegex() => JsonDataSourceCache(sqlContext, param, name, index, tmstCache)
- case OrcRegex() => OrcDataSourceCache(sqlContext, param, name, index, tmstCache)
- case _ => ParquetDataSourceCache(sqlContext, param, name, index, tmstCache)
- }
- Some(dsCache)
- } catch {
- case e: Throwable => {
- error(s"generate data source cache fails")
- None
- }
- }
- } else None
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
deleted file mode 100644
index cb01274..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/JsonDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
- * data source cache in json format
- */
-case class JsonDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, tmstCache: TmstCache
- ) extends DataSourceCache {
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
- info(s"write path: ${path}")
- dfw.json(path)
- }
-
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
- dfr.json(path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
deleted file mode 100644
index daba15f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/OrcDataSourceCache.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
- * data source cache in orc format
- */
-case class OrcDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, tmstCache: TmstCache
- ) extends DataSourceCache {
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
- info(s"write path: ${path}")
- dfw.orc(path)
- }
-
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
- dfr.orc(path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
deleted file mode 100644
index f00c6a3..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/ParquetDataSourceCache.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
- * data source cache in parquet format
- */
-case class ParquetDataSourceCache(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, tmstCache: TmstCache
- ) extends DataSourceCache {
-
- sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
- info(s"write path: ${path}")
- dfw.parquet(path)
- }
-
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
- dfr.parquet(path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
new file mode 100644
index 0000000..1c3ca60
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache}
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.spark.sql._
+
+import scala.util.Random
+
+/**
+ * data source cache in streaming mode
+ * save data frame into hdfs in dump phase
+ * read data frame from hdfs in calculate phase
+ * with update and clean actions for the cache data
+ */
+trait StreamingCacheClient extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable {
+
+ val sqlContext: SQLContext
+ val param: Map[String, Any]
+ val dsName: String
+ val index: Int
+
+ val tmstCache: TmstCache
+ protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until)
+ protected def clearTmst(t: Long) = tmstCache.remove(t)
+ protected def clearTmstsUntil(until: Long) = {
+ val outDateTmsts = tmstCache.until(until)
+ tmstCache.remove(outDateTmsts)
+ }
+ protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
+ protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
+
+ val _FilePath = "file.path"
+ val _InfoPath = "info.path"
+ val _ReadyTimeInterval = "ready.time.interval"
+ val _ReadyTimeDelay = "ready.time.delay"
+ val _TimeRange = "time.range"
+
+ val rdmStr = Random.alphanumeric.take(10).mkString
+ val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
+ val defInfoPath = s"${index}"
+
+ val filePath: String = param.getString(_FilePath, defFilePath)
+ val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+ val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+ val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+ val deltaTimeRange: (Long, Long) = {
+ def negative(n: Long): Long = if (n <= 0) n else 0
+ param.get(_TimeRange) match {
+ case Some(seq: Seq[String]) => {
+ val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+ val ns = negative(nseq.headOption.getOrElse(0))
+ val ne = negative(nseq.tail.headOption.getOrElse(0))
+ (ns, ne)
+ }
+ case _ => (0, 0)
+ }
+ }
+
+ val _ReadOnly = "read.only"
+ val readOnly = param.getBoolean(_ReadOnly, false)
+
+ val _Updatable = "updatable"
+ val updatable = param.getBoolean(_Updatable, false)
+
+ val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
+ val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
+
+ val newFilePath = s"${filePath}/new"
+ val oldFilePath = s"${filePath}/old"
+
+ val defOldCacheIndex = 0L
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
+
+ /**
+ * save data frame in dump phase
+ * @param dfOpt data frame to be saved
+ * @param ms timestamp of this data frame
+ */
+ def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+ if (!readOnly) {
+ dfOpt match {
+ case Some(df) => {
+ // cache df
+ df.cache
+
+ // cache df
+ val cnt = df.count
+ info(s"save ${dsName} data count: ${cnt}")
+
+ if (cnt > 0) {
+ // lock makes it safer when writing new cache data
+ val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (newCacheLocked) {
+ try {
+ val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
+ writeDataFrame(dfw, newFilePath)
+ } catch {
+ case e: Throwable => error(s"save data error: ${e.getMessage}")
+ } finally {
+ newCacheLock.unlock()
+ }
+ }
+ }
+
+ // uncache df
+ df.unpersist
+ }
+ case _ => {
+ info(s"no data frame to save")
+ }
+ }
+
+ // submit cache time and ready time
+ if (fanIncrement(ms)) {
+ info(s"save data [${ms}] finish")
+ submitCacheTime(ms)
+ submitReadyTime(ms)
+ }
+
+ }
+ }
+
+ /**
+ * read data frame in calculation phase
+ * @return data frame to calculate, with the time range of data
+ */
+ def readData(): (Option[DataFrame], TimeRange) = {
+ // time range: (a, b]
+ val timeRange = TimeInfoCache.getTimeRange
+ val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
+
+ // read partition info
+ val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
+ info(s"read time range: [${reviseTimeRange._1}]")
+ s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
+ } else {
+ info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
+ s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+ }
+
+ // new cache data
+ val newDfOpt = try {
+ val dfr = sqlContext.read
+ Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+ } catch {
+ case e: Throwable => {
+ warn(s"read data source cache warn: ${e.getMessage}")
+ None
+ }
+ }
+
+ // old cache data
+ val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
+ val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
+ val oldDfPath = s"${oldFilePath}/${idx}"
+ try {
+ val dfr = sqlContext.read
+ Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+ } catch {
+ case e: Throwable => {
+ warn(s"read old data source cache warn: ${e.getMessage}")
+ None
+ }
+ }
+ }
+
+ // whole cache data
+ val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
+
+ // from until tmst range
+ val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
+ val tmstSet = afterTilRangeTmsts(from, until)
+
+ val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+ (cacheDfOpt, retTimeRange)
+ }
+
+ private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
+ func: (Long, Long) => Boolean
+ ): Unit = {
+ val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
+ // delete out time data path
+ earlierOrEqPaths.foreach { path =>
+ info(s"delete hdfs path: ${path}")
+ HdfsUtil.deleteHdfsPath(path)
+ }
+ }
+ private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
+ func: (Long, Long) => Boolean
+ ): Iterable[String] = {
+ val names = HdfsUtil.listSubPathsByType(path, "dir")
+ val regex = partitionOpt match {
+ case Some(partition) => s"^${partition}=(\\d+)$$".r
+ case _ => "^(\\d+)$".r
+ }
+ names.filter { name =>
+ name match {
+ case regex(value) => {
+ str2Long(value) match {
+ case Some(t) => func(t, bound)
+ case _ => false
+ }
+ }
+ case _ => false
+ }
+ }.map(name => s"${path}/${name}")
+ }
+ private def str2Long(str: String): Option[Long] = {
+ try {
+ Some(str.toLong)
+ } catch {
+ case e: Throwable => None
+ }
+ }
+
+ /**
+ * clean out-time cached data on hdfs
+ */
+ def cleanOutTimeData(): Unit = {
+ // clean tmst
+ val cleanTime = readCleanTime
+ cleanTime.foreach(clearTmstsTil(_))
+
+ if (!readOnly) {
+ // new cache data
+ val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
+ newCacheCleanTime match {
+ case Some(nct) => {
+ // clean calculated new cache data
+ val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (newCacheLocked) {
+ try {
+ cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
+ (a: Long, b: Long) => (a <= b))
+ } catch {
+ case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
+ } finally {
+ newCacheLock.unlock()
+ }
+ }
+ }
+ case _ => {
+ // do nothing
+ }
+ }
+
+ // old cache data
+ val oldCacheCleanTime = if (updatable) readCleanTime else None
+ oldCacheCleanTime match {
+ case Some(oct) => {
+ val oldCacheIndexOpt = readOldCacheIndex
+ oldCacheIndexOpt.foreach { idx =>
+ val oldDfPath = s"${oldFilePath}/${idx}"
+ val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (oldCacheLocked) {
+ try {
+ // clean calculated old cache data
+ cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
+ // clean out time old cache data not calculated
+// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
+ } catch {
+ case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
+ } finally {
+ oldCacheLock.unlock()
+ }
+ }
+ }
+ }
+ case _ => {
+ // do nothing
+ }
+ }
+ }
+ }
+
+ /**
+ * update old cached data by new data frame
+ * @param dfOpt data frame to update old cached data
+ */
+ def updateData(dfOpt: Option[DataFrame]): Unit = {
+ if (!readOnly && updatable) {
+ dfOpt match {
+ case Some(df) => {
+ // old cache lock
+ val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (oldCacheLocked) {
+ try {
+ val oldCacheIndexOpt = readOldCacheIndex
+ val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
+
+ val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
+ val cleanTime = getNextCleanTime
+ val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
+ val updateDf = df.filter(filterStr)
+
+ val prlCount = sqlContext.sparkContext.defaultParallelism
+ // repartition
+ val repartitionedDf = updateDf.repartition(prlCount)
+ val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
+ writeDataFrame(dfw, oldDfPath)
+
+ submitOldCacheIndex(nextOldCacheIndex)
+ } catch {
+ case e: Throwable => error(s"update data error: ${e.getMessage}")
+ } finally {
+ oldCacheLock.unlock()
+ }
+ }
+ }
+ case _ => {
+ info(s"no data frame to update")
+ }
+ }
+ }
+ }
+
+ /**
+ * each time calculation phase finishes,
+ * data source cache needs to submit some cache information
+ */
+ def processFinish(): Unit = {
+ // next last proc time
+ val timeRange = TimeInfoCache.getTimeRange
+ submitLastProcTime(timeRange._2)
+
+ // next clean time
+ val nextCleanTime = timeRange._2 + deltaTimeRange._1
+ submitCleanTime(nextCleanTime)
+ }
+
+ // read next clean time
+ private def getNextCleanTime(): Long = {
+ val timeRange = TimeInfoCache.getTimeRange
+ val nextCleanTime = timeRange._2 + deltaTimeRange._1
+ nextCleanTime
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
new file mode 100644
index 0000000..529b07a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql.SQLContext
+
+object StreamingCacheClientFactory extends Loggable {
+
+ private object DataSourceCacheType {
+ val ParquetRegex = "^(?i)parq(uet)?$".r
+ val JsonRegex = "^(?i)json$".r
+ val OrcRegex = "^(?i)orc$".r
+ }
+ import DataSourceCacheType._
+
+ val _type = "type"
+
+ /**
+ * create streaming cache client
+ * @param sqlContext sqlContext in spark environment
+ * @param param data source cache config
+ * @param name data source name
+ * @param index data source index
+ * @param tmstCache the same tmstCache instance inside a data source
+ * @return streaming cache client option
+ */
+ def getClientOpt(sqlContext: SQLContext, param: Map[String, Any],
+ name: String, index: Int, tmstCache: TmstCache
+ ): Option[StreamingCacheClient] = {
+ if (param != null) {
+ try {
+ val tp = param.getString(_type, "")
+ val dsCache = tp match {
+ case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+ case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+ case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ }
+ Some(dsCache)
+ } catch {
+ case e: Throwable => {
+ error(s"generate data source cache fails")
+ None
+ }
+ }
+ } else None
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
new file mode 100644
index 0000000..494db3e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+ * data source cache in json format
+ */
+case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
+ dsName: String, index: Int, tmstCache: TmstCache
+ ) extends StreamingCacheClient {
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+ info(s"write path: ${path}")
+ dfw.json(path)
+ }
+
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+ dfr.json(path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
new file mode 100644
index 0000000..6e0f142
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+ * data source cache in orc format
+ */
+case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
+ dsName: String, index: Int, tmstCache: TmstCache
+ ) extends StreamingCacheClient {
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+ info(s"write path: ${path}")
+ dfw.orc(path)
+ }
+
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+ dfr.orc(path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
new file mode 100644
index 0000000..d99bc58
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.datasource.cache
+
+import org.apache.griffin.measure.context.datasource.info.TmstCache
+import org.apache.spark.sql._
+
+/**
+ * data source cache in parquet format
+ */
+case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
+ dsName: String, index: Int, tmstCache: TmstCache
+ ) extends StreamingCacheClient {
+
+ sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+ info(s"write path: ${path}")
+ dfw.parquet(path)
+ }
+
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+ dfr.parquet(path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
index ea22309..30352a9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource.connector
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.context.datasource.connector.batch._
import org.apache.griffin.measure.context.datasource.connector.streaming._
import org.apache.griffin.measure.context.datasource.info.TmstCache
@@ -44,14 +44,14 @@ object DataConnectorFactory extends Loggable {
* @param ssc spark streaming env
* @param dcParam data connector param
* @param tmstCache same tmst cache in one data source
- * @param dataSourceCacheOpt for streaming data connector
+ * @param streamingCacheClientOpt for streaming cache
* @return data connector
*/
def getDataConnector(sparkSession: SparkSession,
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TmstCache,
- dataSourceCacheOpt: Option[DataSourceCache]
+ streamingCacheClientOpt: Option[StreamingCacheClient]
): Try[DataConnector] = {
val conType = dcParam.conType
val version = dcParam.version
@@ -61,7 +61,7 @@ object DataConnectorFactory extends Loggable {
case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
case KafkaRegex() => {
- getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt)
+ getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
}
case _ => throw new Exception("connector creation error!")
}
@@ -72,13 +72,13 @@ object DataConnectorFactory extends Loggable {
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TmstCache,
- dataSourceCacheOpt: Option[DataSourceCache]
+ streamingCacheClientOpt: Option[StreamingCacheClient]
): StreamingDataConnector = {
if (ssc == null) throw new Exception("streaming context is null!")
val conType = dcParam.conType
val version = dcParam.version
conType match {
- case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt)
+ case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
case _ => throw new Exception("streaming connector creation error!")
}
}
@@ -87,7 +87,7 @@ object DataConnectorFactory extends Loggable {
ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TmstCache,
- dataSourceCacheOpt: Option[DataSourceCache]
+ streamingCacheClientOpt: Option[StreamingCacheClient]
): KafkaStreamingDataConnector = {
val KeyType = "key.type"
val ValueType = "value.type"
@@ -96,7 +96,7 @@ object DataConnectorFactory extends Loggable {
val valueType = config.getOrElse(ValueType, "java.lang.String").toString
(getClassTag(keyType), getClassTag(valueType)) match {
case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
- KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, dataSourceCacheOpt)
+ KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
}
case _ => {
throw new Exception("not supported type kafka data connector")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
index 9fe4876..de2822b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -43,7 +43,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
def init(): Unit = {
// register fan in
- dataSourceCacheOpt.foreach(_.registerFanIn)
+ streamingCacheClientOpt.foreach(_.registerFanIn)
val ds = stream match {
case Success(dstream) => dstream
@@ -71,7 +71,7 @@ trait KafkaStreamingDataConnector extends StreamingDataConnector {
}
// save data frame
- dataSourceCacheOpt.foreach(_.saveData(saveDfOpt, ms))
+ streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
})
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
index b483933..3083ca6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context.datasource.connector.streaming
import kafka.serializer.StringDecoder
import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.context.datasource.info.TmstCache
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
@@ -36,7 +36,7 @@ case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSessi
@transient ssc: StreamingContext,
dcParam: DataConnectorParam,
tmstCache: TmstCache,
- dataSourceCacheOpt: Option[DataSourceCache]
+ streamingCacheClientOpt: Option[StreamingCacheClient]
) extends KafkaStreamingDataConnector {
type K = String
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
index 3b2c355..737bc21 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
@@ -19,7 +19,7 @@ under the License.
package org.apache.griffin.measure.context.datasource.connector.streaming
import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.cache.DataSourceCache
+import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
import org.apache.griffin.measure.context.datasource.connector.DataConnector
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
@@ -41,6 +41,6 @@ trait StreamingDataConnector extends DataConnector {
// streaming data connector cannot directly read data frame
def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
- val dataSourceCacheOpt: Option[DataSourceCache]
+ val streamingCacheClientOpt: Option[StreamingCacheClient]
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 42364a8..9ec1641 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -44,7 +44,10 @@ trait DQApp extends Loggable with Serializable {
*/
def retryable: Boolean
- protected def getAppTime: Long = {
+ /**
+ * timestamp as a key for metrics
+ */
+ protected def getMeasureTime: Long = {
if (dqParam.timestamp != null && dqParam.timestamp > 0) { dqParam.timestamp }
else { System.currentTimeMillis }
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index fec27b1..1aa5039 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -66,10 +66,10 @@ case class BatchDQApp(allParam: AllParam) extends DQApp {
// start time
val startTime = new Date().getTime
- val appTime = getAppTime
- val contextId = ContextId(appTime)
+ val measureTime = getMeasureTime
+ val contextId = ContextId(measureTime)
- // generate data sources
+ // get data sources
val dataSources = DataSourceFactory.getDataSources(sparkSession, null, dqParam.dataSources)
dataSources.foreach(_.init)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index e4a8108..d89b7e8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -18,14 +18,17 @@ under the License.
*/
package org.apache.griffin.measure.launch.streaming
-import java.util.{Timer, TimerTask}
+import java.util.{Date, Timer, TimerTask}
import java.util.concurrent.{Executors, ThreadPoolExecutor, TimeUnit}
+import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.params._
import org.apache.griffin.measure.context._
import org.apache.griffin.measure.context.datasource.DataSourceFactory
-import org.apache.griffin.measure.context.streaming.info.InfoCacheInstance
+import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.context.streaming.metric.CacheResults
+import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
@@ -87,8 +90,8 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
})
// start time
- val appTime = getAppTime
- val contextId = ContextId(appTime)
+ val measureTime = getMeasureTime
+ val contextId = ContextId(measureTime)
// generate data sources
val dataSources = DataSourceFactory.getDataSources(sparkSession, ssc, dqParam.dataSources)
@@ -104,13 +107,13 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
globalContext.getPersist().start(applicationId)
// process thread
- val dqThread = StreamingDQApp2(globalContext, dqParam.evaluateRule)
+ val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
val processInterval = TimeUtil.milliseconds(sparkParam.processInterval) match {
case Some(interval) => interval
case _ => throw new Exception("invalid batch interval")
}
- val process = TimingProcess(processInterval, dqThread)
+ val process = Scheduler(processInterval, dqCalculator)
process.startup()
ssc.start()
@@ -150,7 +153,94 @@ case class StreamingDQApp(allParam: AllParam) extends DQApp {
}
}
- case class TimingProcess(interval: Long, runnable: Runnable) {
+
+ /**
+ *
+ * @param globalContext
+ * @param evaluateRuleParam
+ */
+ case class StreamingDQCalculator(globalContext: DQContext,
+ evaluateRuleParam: EvaluateRuleParam
+ ) extends Runnable with Loggable {
+
+ val lock = InfoCacheInstance.genLock("process")
+ val appPersist = globalContext.getPersist()
+
+ def run(): Unit = {
+ val updateTimeDate = new Date()
+ val updateTime = updateTimeDate.getTime
+ println(s"===== [${updateTimeDate}] process begins =====")
+ val locked = lock.lock(5, TimeUnit.SECONDS)
+ if (locked) {
+ try {
+
+ TimeInfoCache.startTimeInfoCache
+
+ val startTime = new Date().getTime
+ appPersist.log(startTime, s"starting process ...")
+ val contextId = ContextId(startTime)
+
+ // create dq context
+ val dqContext: DQContext = globalContext.cloneDQContext(contextId)
+
+ // build job
+ val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
+
+ // dq job execute
+ dqJob.execute(dqContext)
+
+ // finish calculation
+ finishCalculation(dqContext)
+
+ // end time
+ val endTime = new Date().getTime
+ appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
+
+ TimeInfoCache.endTimeInfoCache
+
+ // clean old data
+ cleanData(dqContext)
+
+ } catch {
+ case e: Throwable => error(s"process error: ${e.getMessage}")
+ } finally {
+ lock.unlock()
+ }
+ } else {
+ println(s"===== [${updateTimeDate}] process ignores =====")
+ }
+ val endTime = new Date().getTime
+ println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====")
+ }
+
+ // finish calculation for this round
+ private def finishCalculation(context: DQContext): Unit = {
+ context.dataSources.foreach(_.processFinish)
+ }
+
+ // clean old data and old result cache
+ private def cleanData(context: DQContext): Unit = {
+ try {
+ context.dataSources.foreach(_.cleanOldData)
+
+ context.clean()
+
+ val cleanTime = TimeInfoCache.getCleanTime
+ CacheResults.refresh(cleanTime)
+ } catch {
+ case e: Throwable => error(s"clean data error: ${e.getMessage}")
+ }
+ }
+
+ }
+
+
+ /**
+ *
+ * @param interval
+ * @param runnable
+ */
+ case class Scheduler(interval: Long, runnable: Runnable) {
val pool: ThreadPoolExecutor = Executors.newFixedThreadPool(5).asInstanceOf[ThreadPoolExecutor]
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
deleted file mode 100644
index 97ce980..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp2.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.launch.streaming
-
-import java.util.Date
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params._
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.context.streaming.metric.CacheResults
-import org.apache.griffin.measure.context.{ContextId, DQContext}
-import org.apache.griffin.measure.job.builder.DQJobBuilder
-
-case class StreamingDQApp2(globalContext: DQContext,
- evaluateRuleParam: EvaluateRuleParam
- ) extends Runnable with Loggable {
-
- val lock = InfoCacheInstance.genLock("process")
- val appPersist = globalContext.getPersist()
-
- def run(): Unit = {
- val updateTimeDate = new Date()
- val updateTime = updateTimeDate.getTime
- println(s"===== [${updateTimeDate}] process begins =====")
- val locked = lock.lock(5, TimeUnit.SECONDS)
- if (locked) {
- try {
-
- TimeInfoCache.startTimeInfoCache
-
- val startTime = new Date().getTime
- appPersist.log(startTime, s"starting process ...")
- val contextId = ContextId(startTime)
-
- // create dq context
- val dqContext: DQContext = globalContext.cloneDQContext(contextId)
-
- // build job
- val dqJob = DQJobBuilder.buildDQJob(dqContext, evaluateRuleParam)
-
- // dq job execute
- dqJob.execute(dqContext)
-
- // finish calculation
- finishCalculation(dqContext)
-
- // end time
- val endTime = new Date().getTime
- appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
-
- TimeInfoCache.endTimeInfoCache
-
- // clean old data
- cleanData(dqContext)
-
- } catch {
- case e: Throwable => error(s"process error: ${e.getMessage}")
- } finally {
- lock.unlock()
- }
- } else {
- println(s"===== [${updateTimeDate}] process ignores =====")
- }
- val endTime = new Date().getTime
- println(s"===== [${updateTimeDate}] process ends, using ${endTime - updateTime} ms =====")
- }
-
- // finish calculation for this round
- private def finishCalculation(context: DQContext): Unit = {
- context.dataSources.foreach(_.processFinish)
- }
-
- // clean old data and old result cache
- private def cleanData(context: DQContext): Unit = {
- try {
- context.dataSources.foreach(_.cleanOldData)
-
- context.clean()
-
- val cleanTime = TimeInfoCache.getCleanTime
- CacheResults.refresh(cleanTime)
- } catch {
- case e: Throwable => error(s"clean data error: ${e.getMessage}")
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index 9a2c7b5..fa9e38b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -21,7 +21,7 @@ package org.apache.griffin.measure.step.builder
import org.apache.griffin.measure.configuration.enums.NormalizeType
import org.apache.griffin.measure.configuration.params.RuleParam
import org.apache.griffin.measure.context.DQContext
-import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.step.{DQStep, SeqDQStep}
/**
@@ -52,7 +52,7 @@ trait RuleParamStepBuilder extends DQStepBuilder {
}.toSeq
// update writer
val dsCacheUpdateSteps = ruleParam.dsCacheUpdateOpt.map { dsCacheUpdate =>
- DsCacheUpdateWriteStep(dsCacheUpdate.dsName, name)
+ DataSourceUpdateWriteStep(dsCacheUpdate.dsName, name)
}.toSeq
metricSteps ++ recordSteps ++ dsCacheUpdateSteps
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index a0e5ca3..9c14325 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -27,7 +27,7 @@ import org.apache.griffin.measure.step.builder.dsl.expr._
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.AccuracyAnalyzer
import org.apache.griffin.measure.step.transform.DataFrameOps.AccuracyOprKeys
import org.apache.griffin.measure.step.transform.{DataFrameOps, DataFrameOpsTransformStep, SparkSqlTransformStep}
-import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -91,7 +91,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
case BatchProcessType => Nil
case StreamingProcessType => {
val dsName = ruleParam.dsCacheUpdateOpt.map(_.dsName).getOrElse(sourceName)
- DsCacheUpdateWriteStep(dsName, missRecordsTableName) :: Nil
+ DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 3482955..1cf94e0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -26,7 +26,7 @@ import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.step.builder.dsl.expr.{DistinctnessClause, _}
import org.apache.griffin.measure.step.builder.dsl.transform.analyzer.DistinctnessAnalyzer
import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
-import org.apache.griffin.measure.step.write.{DsCacheUpdateWriteStep, MetricWriteStep, RecordWriteStep}
+import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
import org.apache.griffin.measure.utils.ParamUtil._
/**
@@ -133,7 +133,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
val ((transSteps2, writeSteps2), dupCountTableName) = procType match {
case StreamingProcessType if (withOlderTable) => {
// 4.0 update old data
- val targetDsUpdateWriteStep = DsCacheUpdateWriteStep(targetName, targetName)
+ val targetDsUpdateWriteStep = DataSourceUpdateWriteStep(targetName, targetName)
// 4. older alias
val olderAliasTableName = "__older"
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
new file mode 100644
index 0000000..0472416
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/write/DataSourceUpdateWriteStep.scala
@@ -0,0 +1,61 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.write
+
+import org.apache.commons.lang.StringUtils
+import org.apache.griffin.measure.context.DQContext
+import org.apache.spark.sql.DataFrame
+
+/**
+ * update data source streaming cache
+ */
+case class DataSourceUpdateWriteStep(dsName: String,
+ inputName: String
+ ) extends WriteStep {
+
+ val name: String = ""
+ val writeTimestampOpt: Option[Long] = None
+
+ def execute(context: DQContext): Boolean = {
+ collectDsCacheUpdateDf(context) match {
+ case Some(df) => {
+ context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df))
+ }
+ case _ => {
+ warn(s"update ${dsName} from ${inputName} fails")
+ }
+ }
+ true
+ }
+
+ private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
+ try {
+ val df = context.sqlContext.table(s"`${name}`")
+ Some(df)
+ } catch {
+ case e: Throwable => {
+ error(s"get data frame ${name} fails")
+ None
+ }
+ }
+ }
+
+ private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala b/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
deleted file mode 100644
index 27dbb3c..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/step/write/DsCacheUpdateWriteStep.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.step.write
-
-import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.context.DQContext
-import org.apache.spark.sql.DataFrame
-
-/**
- * update streaming data source cache
- */
-case class DsCacheUpdateWriteStep(dsName: String,
- inputName: String
- ) extends WriteStep {
-
- val name: String = ""
- val writeTimestampOpt: Option[Long] = None
-
- def execute(context: DQContext): Boolean = {
- collectDsCacheUpdateDf(context) match {
- case Some(df) => {
- context.dataSources.find(ds => StringUtils.equals(ds.name, dsName)).foreach(_.updateData(df))
- }
- case _ => {
- warn(s"update ${dsName} from ${inputName} fails")
- }
- }
- true
- }
-
- private def getDataFrame(context: DQContext, name: String): Option[DataFrame] = {
- try {
- val df = context.sqlContext.table(s"`${name}`")
- Some(df)
- } catch {
- case e: Throwable => {
- error(s"get data frame ${name} fails")
- None
- }
- }
- }
-
- private def collectDsCacheUpdateDf(context: DQContext): Option[DataFrame] = getDataFrame(context, inputName)
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/ed5ac873/measure/src/test/resources/env-batch.json
----------------------------------------------------------------------
diff --git a/measure/src/test/resources/env-batch.json b/measure/src/test/resources/env-batch.json
index 3e8aa80..0d6ea8a 100644
--- a/measure/src/test/resources/env-batch.json
+++ b/measure/src/test/resources/env-batch.json
@@ -20,15 +20,6 @@
"max.persist.lines": 10000,
"max.lines.per.file": 10000
}
- },
- {
- "type": "http",
- "config": {
- "method": "post",
- "api": "http://10.148.181.248:39200/griffin/accuracy",
- "over.time": "1m",
- "retry": 10
- }
}
],