You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/08/10 10:16:39 UTC
[2/3] incubator-griffin git commit: Modify measure module to support
updated env and config json files format
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
deleted file mode 100644
index 9e020c2..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-import org.apache.curator.framework.imps.CuratorFrameworkState
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.lock.CacheLockInZK
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-
-/**
- * leverage zookeeper for info cache
- * @param config
- * @param metricName
- */
-case class OffsetCacheInZK(config: Map[String, Any], metricName: String) extends OffsetCache with OffsetOps {
-
- val Hosts = "hosts"
- val Namespace = "namespace"
- val Mode = "mode"
- val InitClear = "init.clear"
- val CloseClear = "close.clear"
- val LockPath = "lock.path"
-
- val PersistRegex = """^(?i)persist$""".r
- val EphemeralRegex = """^(?i)ephemeral$""".r
-
- final val separator = ZKPaths.PATH_SEPARATOR
-
- val hosts = config.getOrElse(Hosts, "").toString
- val namespace = config.getOrElse(Namespace, "").toString
- val mode: CreateMode = config.get(Mode) match {
- case Some(s: String) => s match {
- case PersistRegex() => CreateMode.PERSISTENT
- case EphemeralRegex() => CreateMode.EPHEMERAL
- case _ => CreateMode.PERSISTENT
- }
- case _ => CreateMode.PERSISTENT
- }
- val initClear = config.get(InitClear) match {
- case Some(b: Boolean) => b
- case _ => true
- }
- val closeClear = config.get(CloseClear) match {
- case Some(b: Boolean) => b
- case _ => false
- }
- val lockPath = config.getOrElse(LockPath, "lock").toString
-
- private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
- private val builder = CuratorFrameworkFactory.builder()
- .connectString(hosts)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .namespace(cacheNamespace)
- private val client: CuratorFramework = builder.build
-
- def init(): Unit = {
- client.start()
- info("start zk info cache")
- client.usingNamespace(cacheNamespace)
- info(s"init with namespace: ${cacheNamespace}")
- delete(lockPath :: Nil)
- if (initClear) {
- clear
- }
- }
-
- def available(): Boolean = {
- client.getState match {
- case CuratorFrameworkState.STARTED => true
- case _ => false
- }
- }
-
- def close(): Unit = {
- if (closeClear) {
- clear
- }
- info("close zk info cache")
- client.close()
- }
-
- def cache(kvs: Map[String, String]): Unit = {
- kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
- }
-
- def read(keys: Iterable[String]): Map[String, String] = {
- keys.flatMap { key =>
- read(path(key)) match {
- case Some(v) => Some((key, v))
- case _ => None
- }
- }.toMap
- }
-
- def delete(keys: Iterable[String]): Unit = {
- keys.foreach { key => delete(path(key)) }
- }
-
- def clear(): Unit = {
-// delete("/")
- delete(finalCacheInfoPath :: Nil)
- delete(infoPath :: Nil)
- info("clear info")
- }
-
- def listKeys(p: String): List[String] = {
- children(path(p))
- }
-
- def genLock(s: String): CacheLockInZK = {
- val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
- CacheLockInZK(new InterProcessMutex(client, lpt))
- }
-
- private def path(k: String): String = {
- if (k.startsWith(separator)) k else separator + k
- }
-
- private def children(path: String): List[String] = {
- try {
- client.getChildren().forPath(path).asScala.toList
- } catch {
- case e: Throwable => {
- warn(s"list ${path} warn: ${e.getMessage}")
- Nil
- }
- }
- }
-
- private def createOrUpdate(path: String, content: String): Boolean = {
- if (checkExists(path)) {
- update(path, content)
- } else {
- create(path, content)
- }
- }
-
- private def create(path: String, content: String): Boolean = {
- try {
- client.create().creatingParentsIfNeeded().withMode(mode)
- .forPath(path, content.getBytes("utf-8"))
- true
- } catch {
- case e: Throwable => {
- error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
- false
- }
- }
- }
-
- private def update(path: String, content: String): Boolean = {
- try {
- client.setData().forPath(path, content.getBytes("utf-8"))
- true
- } catch {
- case e: Throwable => {
- error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
- false
- }
- }
- }
-
- private def read(path: String): Option[String] = {
- try {
- Some(new String(client.getData().forPath(path), "utf-8"))
- } catch {
- case e: Throwable => {
- warn(s"read ${path} warn: ${e.getMessage}")
- None
- }
- }
- }
-
- private def delete(path: String): Unit = {
- try {
- client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
- } catch {
- case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
- }
- }
-
- private def checkExists(path: String): Boolean = {
- try {
- client.checkExists().forPath(path) != null
- } catch {
- case e: Throwable => {
- warn(s"check exists ${path} warn: ${e.getMessage}")
- false
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
deleted file mode 100644
index c8763ec..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.offset
-
-trait OffsetOps extends Serializable { this: OffsetCache =>
-
- val CacheTime = "cache.time"
- val LastProcTime = "last.proc.time"
- val ReadyTime = "ready.time"
- val CleanTime = "clean.time"
- val OldCacheIndex = "old.cache.index"
-
- def cacheTime(path: String): String = s"${path}/${CacheTime}"
- def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
- def readyTime(path: String): String = s"${path}/${ReadyTime}"
- def cleanTime(path: String): String = s"${path}/${CleanTime}"
- def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
-
- val infoPath = "info"
-
- val finalCacheInfoPath = "info.final"
- val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
- val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
- val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
-
- def startOffsetCache(): Unit = {
- genFinalReadyTime
- }
-
- def getTimeRange(): (Long, Long) = {
- readTimeRange
- }
-
- def getCleanTime(): Long = {
- readCleanTime
- }
-
- def endOffsetCache: Unit = {
- genFinalLastProcTime
- genFinalCleanTime
- }
-
- private def genFinalReadyTime(): Unit = {
- val subPath = listKeys(infoPath)
- val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
- val result = read(keys)
- val times = keys.flatMap { k =>
- getLongOpt(result, k)
- }
- if (times.nonEmpty) {
- val time = times.min
- val map = Map[String, String]((finalReadyTime -> time.toString))
- cache(map)
- }
- }
-
- private def genFinalLastProcTime(): Unit = {
- val subPath = listKeys(infoPath)
- val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
- val result = read(keys)
- val times = keys.flatMap { k =>
- getLongOpt(result, k)
- }
- if (times.nonEmpty) {
- val time = times.min
- val map = Map[String, String]((finalLastProcTime -> time.toString))
- cache(map)
- }
- }
-
- private def genFinalCleanTime(): Unit = {
- val subPath = listKeys(infoPath)
- val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
- val result = read(keys)
- val times = keys.flatMap { k =>
- getLongOpt(result, k)
- }
- if (times.nonEmpty) {
- val time = times.min
- val map = Map[String, String]((finalCleanTime -> time.toString))
- cache(map)
- }
- }
-
- private def readTimeRange(): (Long, Long) = {
- val map = read(List(finalLastProcTime, finalReadyTime))
- val lastProcTime = getLong(map, finalLastProcTime)
- val curReadyTime = getLong(map, finalReadyTime)
- (lastProcTime, curReadyTime)
- }
-
- private def readCleanTime(): Long = {
- val map = read(List(finalCleanTime))
- val cleanTime = getLong(map, finalCleanTime)
- cleanTime
- }
-
- private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
- try {
- map.get(key).map(_.toLong)
- } catch {
- case e: Throwable => None
- }
- }
- private def getLong(map: Map[String, String], key: String) = {
- getLongOpt(map, key).getOrElse(-1L)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
index 8bb0735..6bf6373 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -39,6 +39,8 @@ case class DataSource(name: String,
streamingCacheClientOpt: Option[StreamingCacheClient]
) extends Loggable with Serializable {
+ val isBaseline: Boolean = dsParam.isBaseline
+
def init(): Unit = {
dataConnectors.foreach(_.init)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
index f48185e..7807dfd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -50,7 +50,7 @@ object DataSourceFactory extends Loggable {
// for streaming data cache
val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
- sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, timestampStorage)
+ sparkSession.sqlContext, dataSourceParam.getCheckpointOpt, name, index, timestampStorage)
val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
index cef5c53..0ebe6ba 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.step.builder.ConstantColumns
import org.apache.griffin.measure.utils.DataFrameUtil._
@@ -89,8 +89,8 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
val _Updatable = "updatable"
val updatable = param.getBoolean(_Updatable, false)
- val newCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.new")
- val oldCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.old")
+ val newCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.new")
+ val oldCacheLock = OffsetCheckpointClient.genLock(s"${cacheInfoPath}.old")
val newFilePath = s"${filePath}/new"
val oldFilePath = s"${filePath}/old"
@@ -155,7 +155,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
*/
def readData(): (Option[DataFrame], TimeRange) = {
// time range: (a, b]
- val timeRange = OffsetCacheClient.getTimeRange
+ val timeRange = OffsetCheckpointClient.getTimeRange
val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
// read partition info
@@ -349,7 +349,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
*/
def processFinish(): Unit = {
// next last proc time
- val timeRange = OffsetCacheClient.getTimeRange
+ val timeRange = OffsetCheckpointClient.getTimeRange
submitLastProcTime(timeRange._2)
// next clean time
@@ -359,7 +359,7 @@ trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long]
// read next clean time
private def getNextCleanTime(): Long = {
- val timeRange = OffsetCacheClient.getTimeRange
+ val timeRange = OffsetCheckpointClient.getTimeRange
val nextCleanTime = timeRange._2 + deltaTimeRange._1
nextCleanTime
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
index 17b15e9..f991e2d 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -36,17 +36,17 @@ object StreamingCacheClientFactory extends Loggable {
/**
* create streaming cache client
- * @param sqlContext sqlContext in spark environment
- * @param cacheOpt data source cache config option
- * @param name data source name
- * @param index data source index
- * @param tmstCache the same tmstCache instance inside a data source
- * @return streaming cache client option
+ * @param sqlContext sqlContext in spark environment
+ * @param checkpointOpt data source checkpoint/cache config option
+ * @param name data source name
+ * @param index data source index
+ * @param tmstCache the same tmstCache instance inside a data source
+ * @return streaming cache client option
*/
- def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
+ def getClientOpt(sqlContext: SQLContext, checkpointOpt: Option[Map[String, Any]],
name: String, index: Int, tmstCache: TimestampStorage
): Option[StreamingCacheClient] = {
- cacheOpt.flatMap { param =>
+ checkpointOpt.flatMap { param =>
try {
val tp = param.getString(_type, "")
val dsCache = tp match {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
index 52c1650..7b7f506 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -19,7 +19,7 @@ under the License.
package org.apache.griffin.measure.datasource.cache
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
/**
* timestamp offset of streaming data source cache
@@ -30,30 +30,30 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
val readyTimeInterval: Long
val readyTimeDelay: Long
- def selfCacheInfoPath = s"${OffsetCacheClient.infoPath}/${cacheInfoPath}"
+ def selfCacheInfoPath = s"${OffsetCheckpointClient.infoPath}/${cacheInfoPath}"
- def selfCacheTime = OffsetCacheClient.cacheTime(selfCacheInfoPath)
- def selfLastProcTime = OffsetCacheClient.lastProcTime(selfCacheInfoPath)
- def selfReadyTime = OffsetCacheClient.readyTime(selfCacheInfoPath)
- def selfCleanTime = OffsetCacheClient.cleanTime(selfCacheInfoPath)
- def selfOldCacheIndex = OffsetCacheClient.oldCacheIndex(selfCacheInfoPath)
+ def selfCacheTime = OffsetCheckpointClient.cacheTime(selfCacheInfoPath)
+ def selfLastProcTime = OffsetCheckpointClient.lastProcTime(selfCacheInfoPath)
+ def selfReadyTime = OffsetCheckpointClient.readyTime(selfCacheInfoPath)
+ def selfCleanTime = OffsetCheckpointClient.cleanTime(selfCacheInfoPath)
+ def selfOldCacheIndex = OffsetCheckpointClient.oldCacheIndex(selfCacheInfoPath)
protected def submitCacheTime(ms: Long): Unit = {
val map = Map[String, String]((selfCacheTime -> ms.toString))
- OffsetCacheClient.cache(map)
+ OffsetCheckpointClient.cache(map)
}
protected def submitReadyTime(ms: Long): Unit = {
val curReadyTime = ms - readyTimeDelay
if (curReadyTime % readyTimeInterval == 0) {
val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
- OffsetCacheClient.cache(map)
+ OffsetCheckpointClient.cache(map)
}
}
protected def submitLastProcTime(ms: Long): Unit = {
val map = Map[String, String]((selfLastProcTime -> ms.toString))
- OffsetCacheClient.cache(map)
+ OffsetCheckpointClient.cache(map)
}
protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime)
@@ -61,7 +61,7 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
protected def submitCleanTime(ms: Long): Unit = {
val cleanTime = genCleanTime(ms)
val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
- OffsetCacheClient.cache(map)
+ OffsetCheckpointClient.cache(map)
}
protected def genCleanTime(ms: Long): Long = ms
@@ -70,13 +70,13 @@ trait StreamingOffsetCacheable extends Loggable with Serializable {
protected def submitOldCacheIndex(index: Long): Unit = {
val map = Map[String, String]((selfOldCacheIndex -> index.toString))
- OffsetCacheClient.cache(map)
+ OffsetCheckpointClient.cache(map)
}
def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
private def readSelfInfo(key: String): Option[Long] = {
- OffsetCacheClient.read(key :: Nil).get(key).flatMap { v =>
+ OffsetCheckpointClient.read(key :: Nil).get(key).flatMap { v =>
try {
Some(v.toLong)
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
index fc09661..caea078 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -21,13 +21,13 @@ package org.apache.griffin.measure.datasource.connector
import java.util.concurrent.atomic.AtomicLong
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType}
+import org.apache.griffin.measure.configuration.enums.BatchProcessType
import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
import org.apache.griffin.measure.datasource.TimestampStorage
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
+import org.apache.griffin.measure.step.builder.preproc.PreProcParamMaker
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
@@ -38,7 +38,6 @@ trait DataConnector extends Loggable with Serializable {
val dcParam: DataConnectorParam
val id: String = DataConnectorIdGenerator.genId
- protected def thisName(suffix: String): String = s"this_${suffix}"
val timestampStorage: TimestampStorage
protected def saveTmst(t: Long) = timestampStorage.insert(t)
@@ -59,13 +58,13 @@ trait DataConnector extends Loggable with Serializable {
val timestamp = context.contextId.timestamp
val suffix = context.contextId.id
- val thisTable = thisName(suffix)
+ val dcDfName = dcParam.getDataFrameName("this")
try {
saveTmst(timestamp) // save timestamp
dfOpt.flatMap { df =>
- val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
+ val (preProcRules, thisTable) = PreProcParamMaker.makePreProcRules(dcParam.getPreProcRules, suffix, dcDfName)
// init data
context.compileTableRegister.registerTable(thisTable)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
index 8860408..3b19892 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/DQApp.scala
@@ -19,7 +19,7 @@ under the License.
package org.apache.griffin.measure.launch
import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig}
+import org.apache.griffin.measure.configuration.dqdefinition.{DQConfig, EnvConfig, SinkParam}
import scala.util.Try
@@ -54,4 +54,11 @@ trait DQApp extends Loggable with Serializable {
}
}
+ protected def getSinkParams: Seq[SinkParam] = {
+ val validSinkTypes = dqParam.getValidSinkTypes
+ envParam.getSinkParams.flatMap { sinkParam =>
+ if (validSinkTypes.contains(sinkParam.getType)) Some(sinkParam) else None
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index a651f2e..8733789 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -41,7 +41,7 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
val metricName = dqParam.name
// val dataSourceParams = dqParam.dataSources
// val dataSourceNames = dataSourceParams.map(_.name)
- val persistParams = envParam.persistParams
+ val sinkParams = getSinkParams
var sqlContext: SQLContext = _
@@ -75,12 +75,12 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
// create dq context
val dqContext: DQContext = DQContext(
- contextId, metricName, dataSources, persistParams, BatchProcessType
+ contextId, metricName, dataSources, sinkParams, BatchProcessType
)(sparkSession)
// start id
val applicationId = sparkSession.sparkContext.applicationId
- dqContext.getPersist().start(applicationId)
+ dqContext.getSink().start(applicationId)
// build job
val dqJob = DQJobBuilder.buildDQJob(dqContext, dqParam.evaluateRule)
@@ -90,13 +90,13 @@ case class BatchDQApp(allParam: GriffinConfig) extends DQApp {
// end time
val endTime = new Date().getTime
- dqContext.getPersist().log(endTime, s"process using time: ${endTime - startTime} ms")
+ dqContext.getSink().log(endTime, s"process using time: ${endTime - startTime} ms")
// clean context
dqContext.clean()
// finish
- dqContext.getPersist().finish()
+ dqContext.getSink().finish()
}
def close: Try[_] = Try {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index 126bedd..1768ae2 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -25,8 +25,8 @@ import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition._
import org.apache.griffin.measure.context._
+import org.apache.griffin.measure.context.streaming.checkpoint.offset.OffsetCheckpointClient
import org.apache.griffin.measure.datasource.DataSourceFactory
-import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
import org.apache.griffin.measure.context.streaming.metric.CacheResults
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
@@ -47,7 +47,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val metricName = dqParam.name
// val dataSourceParams = dqParam.dataSources
// val dataSourceNames = dataSourceParams.map(_.name)
- val persistParams = envParam.persistParams
+ val sinkParams = getSinkParams
var sqlContext: SQLContext = _
@@ -68,8 +68,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
clearCpDir
// init info cache instance
- OffsetCacheClient.initClient(envParam.offsetCacheParams, metricName)
- OffsetCacheClient.init
+ OffsetCheckpointClient.initClient(envParam.checkpointParams, metricName)
+ OffsetCheckpointClient.init
// register udf
GriffinUDFAgent.register(sqlContext)
@@ -99,12 +99,12 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
// create dq context
val globalContext: DQContext = DQContext(
- contextId, metricName, dataSources, persistParams, StreamingProcessType
+ contextId, metricName, dataSources, sinkParams, StreamingProcessType
)(sparkSession)
// start id
val applicationId = sparkSession.sparkContext.applicationId
- globalContext.getPersist().start(applicationId)
+ globalContext.getSink().start(applicationId)
// process thread
val dqCalculator = StreamingDQCalculator(globalContext, dqParam.evaluateRule)
@@ -124,7 +124,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
globalContext.clean()
// finish
- globalContext.getPersist().finish()
+ globalContext.getSink().finish()
}
@@ -163,8 +163,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
evaluateRuleParam: EvaluateRuleParam
) extends Runnable with Loggable {
- val lock = OffsetCacheClient.genLock("process")
- val appPersist = globalContext.getPersist()
+ val lock = OffsetCheckpointClient.genLock("process")
+ val appSink = globalContext.getSink()
def run(): Unit = {
val updateTimeDate = new Date()
@@ -174,10 +174,10 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
if (locked) {
try {
- OffsetCacheClient.startOffsetCache
+ OffsetCheckpointClient.startOffsetCheckpoint
val startTime = new Date().getTime
- appPersist.log(startTime, "starting process ...")
+ appSink.log(startTime, "starting process ...")
val contextId = ContextId(startTime)
// create dq context
@@ -194,9 +194,9 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
// end time
val endTime = new Date().getTime
- appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
+ appSink.log(endTime, s"process using time: ${endTime - startTime} ms")
- OffsetCacheClient.endOffsetCache
+ OffsetCheckpointClient.endOffsetCheckpoint
// clean old data
cleanData(dqContext)
@@ -225,7 +225,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
context.clean()
- val cleanTime = OffsetCacheClient.getCleanTime
+ val cleanTime = OffsetCheckpointClient.getCleanTime
CacheResults.refresh(cleanTime)
} catch {
case e: Throwable => error(s"clean data error: ${e.getMessage}")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
index 9e0acd8..306befe 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ConsoleSink.scala
@@ -23,7 +23,7 @@ import org.apache.griffin.measure.utils.ParamUtil._
import org.apache.spark.rdd.RDD
/**
- * persist metric and record to console, for debug
+ * sink metric and record to console, for debug
*/
case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
@@ -46,7 +46,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp:
println(s"[${timeStamp}] ${rt}: ${msg}")
}
- def persistRecords(records: RDD[String], name: String): Unit = {
+ def sinkRecords(records: RDD[String], name: String): Unit = {
// println(s"${metricName} [${timeStamp}] records: ")
// try {
// val recordCount = records.count
@@ -61,7 +61,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp:
// }
}
- def persistRecords(records: Iterable[String], name: String): Unit = {
+ def sinkRecords(records: Iterable[String], name: String): Unit = {
// println(s"${metricName} [${timeStamp}] records: ")
// try {
// val recordCount = records.size
@@ -74,7 +74,7 @@ case class ConsoleSink(config: Map[String, Any], metricName: String, timeStamp:
// }
}
- def persistMetrics(metrics: Map[String, Any]): Unit = {
+ def sinkMetrics(metrics: Map[String, Any]): Unit = {
println(s"${metricName} [${timeStamp}] metrics: ")
val json = JsonUtil.toJson(metrics)
println(json)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
new file mode 100644
index 0000000..e5f72d1
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/ElasticSearchSink.scala
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.sink
+
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
+import org.apache.spark.rdd.RDD
+
+import scala.concurrent.Future
+
+/**
+ * sink metric and record through http request
+ */
+case class ElasticSearchSink(config: Map[String, Any], metricName: String,
+ timeStamp: Long, block: Boolean
+ ) extends Sink {
+
+ val Api = "api"
+ val Method = "method"
+ val ConnectionTimeout = "connection.timeout"
+ val Retry = "retry"
+
+ val api = config.getString(Api, "")
+ val method = config.getString(Method, "post")
+ val connectionTimeout = TimeUtil.milliseconds(config.getString(ConnectionTimeout, "")).getOrElse(-1L)
+ val retry = config.getInt(Retry, 10)
+
+ val _Value = "value"
+
+ def available(): Boolean = {
+ api.nonEmpty
+ }
+
+ def start(msg: String): Unit = {}
+ def finish(): Unit = {}
+
+ private def httpResult(dataMap: Map[String, Any]) = {
+ try {
+ val data = JsonUtil.toJson(dataMap)
+ // http request
+ val params = Map[String, Object]()
+ val header = Map[String, Object](("Content-Type","application/json"))
+
+ def func(): (Long, Future[Boolean]) = {
+ import scala.concurrent.ExecutionContext.Implicits.global
+ (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data)))
+ }
+ if (block) SinkTaskRunner.addBlockTask(func _, retry, connectionTimeout)
+ else SinkTaskRunner.addNonBlockTask(func _, retry)
+ } catch {
+ case e: Throwable => error(e.getMessage)
+ }
+
+ }
+
+ def log(rt: Long, msg: String): Unit = {}
+
+ def sinkRecords(records: RDD[String], name: String): Unit = {}
+ def sinkRecords(records: Iterable[String], name: String): Unit = {}
+
+ def sinkMetrics(metrics: Map[String, Any]): Unit = {
+ httpResult(metrics)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
index 5608e7d..718f1c1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/HdfsSink.scala
@@ -25,7 +25,7 @@ import org.apache.griffin.measure.utils.{HdfsUtil, JsonUtil}
import org.apache.spark.rdd.RDD
/**
- * persist metric and record to hdfs
+ * sink metric and record to hdfs
*/
case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Long) extends Sink {
@@ -111,7 +111,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
HdfsUtil.deleteHdfsPath(path)
}
- def persistRecords(records: RDD[String], name: String): Unit = {
+ def sinkRecords(records: RDD[String], name: String): Unit = {
val path = filePath(name)
clearOldRecords(path)
try {
@@ -121,7 +121,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
val groupCount = ((count - 1) / maxLinesPerFile + 1).toInt
if (groupCount <= 1) {
val recs = records.take(count.toInt)
- persistRecords2Hdfs(path, recs)
+ sinkRecords2Hdfs(path, recs)
} else {
val groupedRecords: RDD[(Long, Iterable[String])] =
records.zipWithIndex.flatMap { r =>
@@ -131,7 +131,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
groupedRecords.foreach { group =>
val (gid, recs) = group
val hdfsPath = if (gid == 0) path else withSuffix(path, gid.toString)
- persistRecords2Hdfs(hdfsPath, recs)
+ sinkRecords2Hdfs(hdfsPath, recs)
}
}
}
@@ -140,7 +140,7 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
}
}
- def persistRecords(records: Iterable[String], name: String): Unit = {
+ def sinkRecords(records: Iterable[String], name: String): Unit = {
val path = filePath(name)
clearOldRecords(path)
try {
@@ -150,13 +150,13 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
val groupCount = (count - 1) / maxLinesPerFile + 1
if (groupCount <= 1) {
val recs = records.take(count.toInt)
- persistRecords2Hdfs(path, recs)
+ sinkRecords2Hdfs(path, recs)
} else {
val groupedRecords = records.grouped(maxLinesPerFile).zipWithIndex
groupedRecords.take(groupCount).foreach { group =>
val (recs, gid) = group
val hdfsPath = getHdfsPath(path, gid)
- persistRecords2Hdfs(hdfsPath, recs)
+ sinkRecords2Hdfs(hdfsPath, recs)
}
}
}
@@ -165,16 +165,16 @@ case class HdfsSink(config: Map[String, Any], metricName: String, timeStamp: Lon
}
}
- def persistMetrics(metrics: Map[String, Any]): Unit = {
+ def sinkMetrics(metrics: Map[String, Any]): Unit = {
try {
val json = JsonUtil.toJson(metrics)
- persistRecords2Hdfs(MetricsFile, json :: Nil)
+ sinkRecords2Hdfs(MetricsFile, json :: Nil)
} catch {
case e: Throwable => error(e.getMessage)
}
}
- private def persistRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = {
+ private def sinkRecords2Hdfs(hdfsPath: String, records: Iterable[String]): Unit = {
try {
val recStr = records.mkString("\n")
HdfsUtil.writeContent(hdfsPath, recStr)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala
deleted file mode 100644
index 63677c5..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/HttpSink.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.sink
-
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HttpUtil, JsonUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-
-import scala.concurrent.Future
-
-/**
- * persist metric and record through http request
- */
-case class HttpSink(config: Map[String, Any], metricName: String,
- timeStamp: Long, block: Boolean
- ) extends Sink {
-
- val Api = "api"
- val Method = "method"
- val OverTime = "over.time"
- val Retry = "retry"
-
- val api = config.getString(Api, "")
- val method = config.getString(Method, "post")
- val overTime = TimeUtil.milliseconds(config.getString(OverTime, "")).getOrElse(-1L)
- val retry = config.getInt(Retry, 10)
-
- val _Value = "value"
-
- def available(): Boolean = {
- api.nonEmpty
- }
-
- def start(msg: String): Unit = {}
- def finish(): Unit = {}
-
- private def httpResult(dataMap: Map[String, Any]) = {
- try {
- val data = JsonUtil.toJson(dataMap)
- // http request
- val params = Map[String, Object]()
- val header = Map[String, Object](("Content-Type","application/json"))
-
- def func(): (Long, Future[Boolean]) = {
- import scala.concurrent.ExecutionContext.Implicits.global
- (timeStamp, Future(HttpUtil.httpRequest(api, method, params, header, data)))
- }
- if (block) SinkTaskRunner.addBlockTask(func _, retry, overTime)
- else SinkTaskRunner.addNonBlockTask(func _, retry)
- } catch {
- case e: Throwable => error(e.getMessage)
- }
-
- }
-
- def log(rt: Long, msg: String): Unit = {}
-
- def persistRecords(records: RDD[String], name: String): Unit = {}
- def persistRecords(records: Iterable[String], name: String): Unit = {}
-
- def persistMetrics(metrics: Map[String, Any]): Unit = {
- httpResult(metrics)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
index 6eb55e5..206f187 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MongoSink.scala
@@ -28,7 +28,7 @@ import org.mongodb.scala.result.UpdateResult
import scala.concurrent.Future
/**
- * persist metric and record to mongo
+ * sink metric and record to mongo
*/
case class MongoSink(config: Map[String, Any], metricName: String,
timeStamp: Long, block: Boolean
@@ -53,10 +53,10 @@ case class MongoSink(config: Map[String, Any], metricName: String,
def log(rt: Long, msg: String): Unit = {}
- def persistRecords(records: RDD[String], name: String): Unit = {}
- def persistRecords(records: Iterable[String], name: String): Unit = {}
+ def sinkRecords(records: RDD[String], name: String): Unit = {}
+ def sinkRecords(records: Iterable[String], name: String): Unit = {}
- def persistMetrics(metrics: Map[String, Any]): Unit = {
+ def sinkMetrics(metrics: Map[String, Any]): Unit = {
mongoInsert(metrics)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
index ede52cc..cca1ff9 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/MultiSinks.scala
@@ -21,13 +21,13 @@ package org.apache.griffin.measure.sink
import org.apache.spark.rdd.RDD
/**
- * persist metric and record in multiple ways
+ * sink metric and record in multiple ways
*/
-case class MultiSinks(persists: Iterable[Sink]) extends Sink {
+case class MultiSinks(sinks: Iterable[Sink]) extends Sink {
val block: Boolean = false
- val headSinkOpt: Option[Sink] = persists.headOption
+ val headSinkOpt: Option[Sink] = sinks.headOption
val metricName: String = headSinkOpt.map(_.metricName).getOrElse("")
@@ -35,45 +35,45 @@ case class MultiSinks(persists: Iterable[Sink]) extends Sink {
val config: Map[String, Any] = Map[String, Any]()
- def available(): Boolean = { persists.exists(_.available()) }
+ def available(): Boolean = { sinks.exists(_.available()) }
- def start(msg: String): Unit = { persists.foreach(_.start(msg)) }
- def finish(): Unit = { persists.foreach(_.finish()) }
+ def start(msg: String): Unit = { sinks.foreach(_.start(msg)) }
+ def finish(): Unit = { sinks.foreach(_.finish()) }
def log(rt: Long, msg: String): Unit = {
- persists.foreach { persist =>
+ sinks.foreach { sink =>
try {
- persist.log(rt, msg)
+ sink.log(rt, msg)
} catch {
case e: Throwable => error(s"log error: ${e.getMessage}")
}
}
}
- def persistRecords(records: RDD[String], name: String): Unit = {
- persists.foreach { persist =>
+ def sinkRecords(records: RDD[String], name: String): Unit = {
+ sinks.foreach { sink =>
try {
- persist.persistRecords(records, name)
+ sink.sinkRecords(records, name)
} catch {
- case e: Throwable => error(s"persist records error: ${e.getMessage}")
+ case e: Throwable => error(s"sink records error: ${e.getMessage}")
}
}
}
- def persistRecords(records: Iterable[String], name: String): Unit = {
- persists.foreach { persist =>
+ def sinkRecords(records: Iterable[String], name: String): Unit = {
+ sinks.foreach { sink =>
try {
- persist.persistRecords(records, name)
+ sink.sinkRecords(records, name)
} catch {
- case e: Throwable => error(s"persist records error: ${e.getMessage}")
+ case e: Throwable => error(s"sink records error: ${e.getMessage}")
}
}
}
- def persistMetrics(metrics: Map[String, Any]): Unit = {
- persists.foreach { persist =>
+ def sinkMetrics(metrics: Map[String, Any]): Unit = {
+ sinks.foreach { sink =>
try {
- persist.persistMetrics(metrics)
+ sink.sinkMetrics(metrics)
} catch {
- case e: Throwable => error(s"persist metrics error: ${e.getMessage}")
+ case e: Throwable => error(s"sink metrics error: ${e.getMessage}")
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
index f052ae1..0c03bd8 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/Sink.scala
@@ -22,7 +22,7 @@ import org.apache.griffin.measure.Loggable
import org.apache.spark.rdd.RDD
/**
- * persist metric and record
+ * sink metric and record
*/
trait Sink extends Loggable with Serializable {
val metricName: String
@@ -39,9 +39,9 @@ trait Sink extends Loggable with Serializable {
def log(rt: Long, msg: String): Unit
- def persistRecords(records: RDD[String], name: String): Unit
- def persistRecords(records: Iterable[String], name: String): Unit
+ def sinkRecords(records: RDD[String], name: String): Unit
+ def sinkRecords(records: Iterable[String], name: String): Unit
- def persistMetrics(metrics: Map[String, Any]): Unit
+ def sinkMetrics(metrics: Map[String, Any]): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
index 85bf1b1..26b0178 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkFactory.scala
@@ -18,39 +18,36 @@ under the License.
*/
package org.apache.griffin.measure.sink
-import org.apache.griffin.measure.configuration.dqdefinition.PersistParam
+import org.apache.griffin.measure.configuration.dqdefinition.SinkParam
+import org.apache.griffin.measure.configuration.enums._
import scala.util.{Success, Try}
-case class SinkFactory(persistParams: Iterable[PersistParam], metricName: String) extends Serializable {
-
- val HDFS_REGEX = """^(?i)hdfs$""".r
- val HTTP_REGEX = """^(?i)http$""".r
- val LOG_REGEX = """^(?i)log$""".r
- val MONGO_REGEX = """^(?i)mongo$""".r
+case class SinkFactory(sinkParams: Iterable[SinkParam], metricName: String) extends Serializable {
/**
- * create persist
- * @param timeStamp the timestamp of persist
- * @param block persist write metric in block or non-block way
- * @return persist
+ * create sink
+ * @param timeStamp the timestamp of sink
+ * @param block sink write metric in block or non-block way
+ * @return sink
*/
- def getPersists(timeStamp: Long, block: Boolean): MultiSinks = {
- MultiSinks(persistParams.flatMap(param => getPersist(timeStamp, param, block)))
+ def getSinks(timeStamp: Long, block: Boolean): MultiSinks = {
+ MultiSinks(sinkParams.flatMap(param => getSink(timeStamp, param, block)))
}
- private def getPersist(timeStamp: Long, persistParam: PersistParam, block: Boolean): Option[Sink] = {
- val config = persistParam.getConfig
- val persistTry = persistParam.getType match {
- case LOG_REGEX() => Try(ConsoleSink(config, metricName, timeStamp))
- case HDFS_REGEX() => Try(HdfsSink(config, metricName, timeStamp))
- case HTTP_REGEX() => Try(HttpSink(config, metricName, timeStamp, block))
- case MONGO_REGEX() => Try(MongoSink(config, metricName, timeStamp, block))
- case _ => throw new Exception("not supported persist type")
+ private def getSink(timeStamp: Long, sinkParam: SinkParam, block: Boolean): Option[Sink] = {
+ val config = sinkParam.getConfig
+ val sinkType = sinkParam.getType
+ val sinkTry = sinkType match {
+ case ConsoleSinkType => Try(ConsoleSink(config, metricName, timeStamp))
+ case HdfsSinkType => Try(HdfsSink(config, metricName, timeStamp))
+ case ElasticsearchSinkType => Try(ElasticSearchSink(config, metricName, timeStamp, block))
+ case MongoSinkType => Try(MongoSink(config, metricName, timeStamp, block))
+ case _ => throw new Exception(s"sink type ${sinkType} is not supported!")
}
- persistTry match {
- case Success(persist) if (persist.available) => Some(persist)
+ sinkTry match {
+ case Success(sink) if (sink.available) => Some(sink)
case _ => None
}
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
index 79f0bb1..1cc3f3e 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/sink/SinkTaskRunner.scala
@@ -28,7 +28,7 @@ import scala.concurrent.duration._
import scala.util.{Failure, Success}
/**
- * persist task runner, to persist metrics in block or non-block mode
+ * sink task runner, to sink metrics in block or non-block mode
*/
object SinkTaskRunner extends Loggable {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
index f945856..796c797 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/DataFrameOpsDQStepBuilder.scala
@@ -26,9 +26,10 @@ import org.apache.griffin.measure.step.transform.DataFrameOpsTransformStep
case class DataFrameOpsDQStepBuilder() extends RuleParamStepBuilder {
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
- val name = getStepName(ruleParam.getName)
+ val name = getStepName(ruleParam.getOutDfName())
+ val inputDfName = getStepName(ruleParam.getInDfName())
val transformStep = DataFrameOpsTransformStep(
- name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
+ name, inputDfName, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
transformStep +: buildDirectWriteSteps(ruleParam)
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
index 7dd3c73..d3c0e41 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/GriffinDslDQStepBuilder.scala
@@ -36,14 +36,14 @@ case class GriffinDslDQStepBuilder(dataSourceNames: Seq[String],
val parser = GriffinDslParser(dataSourceNames, filteredFunctionNames)
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
- val name = getStepName(ruleParam.getName)
+ val name = getStepName(ruleParam.getOutDfName())
val rule = ruleParam.getRule
val dqType = ruleParam.getDqType
try {
val result = parser.parseRule(rule, dqType)
if (result.successful) {
val expr = result.get
- val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceName(name))
+ val expr2DQSteps = Expr2DQSteps(context, expr, ruleParam.replaceOutDfName(name))
expr2DQSteps.getDQSteps()
} else {
warn(s"parse rule [ ${rule} ] fails: \n${result}")
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
index f9f101e..176aed6 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/RuleParamStepBuilder.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.step.builder
-import org.apache.griffin.measure.configuration.enums.NormalizeType
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.write.{DataSourceUpdateWriteStep, MetricWriteStep, RecordWriteStep}
@@ -41,18 +41,18 @@ trait RuleParamStepBuilder extends DQStepBuilder {
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep]
protected def buildDirectWriteSteps(ruleParam: RuleParam): Seq[DQStep] = {
- val name = getStepName(ruleParam.getName)
+ val name = getStepName(ruleParam.getOutDfName())
// metric writer
- val metricSteps = ruleParam.getMetricOpt.map { metric =>
- MetricWriteStep(metric.getNameOpt.getOrElse(name), name, NormalizeType(metric.collectType))
+ val metricSteps = ruleParam.getOutputOpt(MetricOutputType).map { metric =>
+ MetricWriteStep(metric.getNameOpt.getOrElse(name), name, FlattenType(metric.flatten))
}.toSeq
// record writer
- val recordSteps = ruleParam.getRecordOpt.map { record =>
+ val recordSteps = ruleParam.getOutputOpt(RecordOutputType).map { record =>
RecordWriteStep(record.getNameOpt.getOrElse(name), name)
}.toSeq
// update writer
- val dsCacheUpdateSteps = ruleParam.getDsCacheUpdateOpt.map { dsCacheUpdate =>
- DataSourceUpdateWriteStep(dsCacheUpdate.getDsNameOpt.getOrElse(""), name)
+ val dsCacheUpdateSteps = ruleParam.getOutputOpt(DscUpdateOutputType).map { dsCacheUpdate =>
+ DataSourceUpdateWriteStep(dsCacheUpdate.getNameOpt.getOrElse(""), name)
}.toSeq
metricSteps ++ recordSteps ++ dsCacheUpdateSteps
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
index f66192c..b5dfd0c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/SparkSqlDQStepBuilder.scala
@@ -26,7 +26,7 @@ import org.apache.griffin.measure.step.transform.SparkSqlTransformStep
case class SparkSqlDQStepBuilder() extends RuleParamStepBuilder {
def buildSteps(context: DQContext, ruleParam: RuleParam): Seq[DQStep] = {
- val name = getStepName(ruleParam.getName)
+ val name = getStepName(ruleParam.getOutDfName())
val transformStep = SparkSqlTransformStep(
name, ruleParam.getRule, ruleParam.getDetails, ruleParam.getCache)
transformStep +: buildDirectWriteSteps(ruleParam)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
index 27872c2..7c84d38 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/AccuracyExpr2DQSteps.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
@@ -82,7 +82,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val missRecordsTransStep = SparkSqlTransformStep(missRecordsTableName, missRecordsSql, emptyMap, true)
val missRecordsWriteSteps = procType match {
case BatchProcessType => {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName) :: Nil
}
case StreamingProcessType => Nil
@@ -90,7 +90,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val missRecordsUpdateWriteSteps = procType match {
case BatchProcessType => Nil
case StreamingProcessType => {
- val dsName = ruleParam.getDsCacheUpdateOpt.flatMap(_.getDsNameOpt).getOrElse(sourceName)
+ val dsName = ruleParam.getOutputOpt(DscUpdateOutputType).flatMap(_.getNameOpt).getOrElse(sourceName)
DataSourceUpdateWriteStep(dsName, missRecordsTableName) :: Nil
}
}
@@ -114,7 +114,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 4. accuracy metric
- val accuracyTableName = ruleParam.name
+ val accuracyTableName = ruleParam.outDfName
val matchedColName = details.getStringOrKey(_matched)
val accuracyMetricSql = procType match {
case BatchProcessType => {
@@ -139,10 +139,10 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyTransStep = SparkSqlTransformStep(accuracyTableName, accuracyMetricSql, emptyMap)
val accuracyMetricWriteSteps = procType match {
case BatchProcessType => {
- val metricOpt = ruleParam.getMetricOpt
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
- val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
- MetricWriteStep(mwName, accuracyTableName, collectType) :: Nil
+ val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ MetricWriteStep(mwName, accuracyTableName, flattenType) :: Nil
}
case StreamingProcessType => Nil
}
@@ -159,18 +159,17 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyMetricTableName = "__accuracy"
val accuracyMetricRule = DataFrameOps._accuracy
val accuracyMetricDetails = Map[String, Any](
- (AccuracyOprKeys._dfName -> accuracyTableName),
(AccuracyOprKeys._miss -> missColName),
(AccuracyOprKeys._total -> totalColName),
(AccuracyOprKeys._matched -> matchedColName)
)
val accuracyMetricTransStep = DataFrameOpsTransformStep(accuracyMetricTableName,
- accuracyMetricRule, accuracyMetricDetails)
+ accuracyTableName, accuracyMetricRule, accuracyMetricDetails)
val accuracyMetricWriteStep = {
- val metricOpt = ruleParam.getMetricOpt
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
- val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
- MetricWriteStep(mwName, accuracyMetricTableName, collectType)
+ val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ MetricWriteStep(mwName, accuracyMetricTableName, flattenType)
}
// 6. collect accuracy records
@@ -184,7 +183,7 @@ case class AccuracyExpr2DQSteps(context: DQContext,
val accuracyRecordTransStep = SparkSqlTransformStep(
accuracyRecordTableName, accuracyRecordSql, emptyMap)
val accuracyRecordWriteStep = {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(missRecordsTableName)
RecordWriteStep(rwName, missRecordsTableName, Some(accuracyRecordTableName))
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
index 469b16e..347fabd 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/CompletenessExpr2DQSteps.scala
@@ -89,7 +89,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val incompleteRecordsSql = s"SELECT * FROM `${sourceAliasTableName}` WHERE ${incompleteWhereClause}"
val incompleteRecordTransStep = SparkSqlTransformStep(incompleteRecordsTableName, incompleteRecordsSql, emptyMap, true)
val incompleteRecordWriteStep = {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(incompleteRecordsTableName)
RecordWriteStep(rwName, incompleteRecordsTableName)
}
@@ -112,7 +112,7 @@ case class CompletenessExpr2DQSteps(context: DQContext,
val totalCountTransStep = SparkSqlTransformStep(totalCountTableName, totalCountSql, emptyMap)
// 5. complete metric
- val completeTableName = ruleParam.name
+ val completeTableName = ruleParam.outDfName
val completeColName = details.getStringOrKey(_complete)
val completeMetricSql = procType match {
case BatchProcessType => {
@@ -136,10 +136,10 @@ case class CompletenessExpr2DQSteps(context: DQContext,
}
val completeTransStep = SparkSqlTransformStep(completeTableName, completeMetricSql, emptyMap)
val completeWriteStep = {
- val metricOpt = ruleParam.getMetricOpt
+ val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(completeTableName)
- val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
- MetricWriteStep(mwName, completeTableName, collectType)
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ MetricWriteStep(mwName, completeTableName, flattenType)
}
val transSteps = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
index 3390263..6c56a77 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/DistinctnessExpr2DQSteps.scala
@@ -18,7 +18,7 @@ under the License.
*/
package org.apache.griffin.measure.step.builder.dsl.transform
-import org.apache.griffin.measure.configuration.enums.{ArrayNormalizeType, EntriesNormalizeType, ProcessType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
@@ -111,7 +111,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
val totalMetricWriteStep = {
- MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType, writeTimestampOpt)
+ MetricWriteStep(totalColName, totalTableName, EntriesFlattenType, writeTimestampOpt)
}
// 3. group by self
@@ -221,7 +221,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val distTransStep = SparkSqlTransformStep(distTableName, distSql, emptyMap)
val distMetricWriteStep = {
- MetricWriteStep(distColName, distTableName, EntriesNormalizeType, writeTimestampOpt)
+ MetricWriteStep(distColName, distTableName, EntriesFlattenType, writeTimestampOpt)
}
val transSteps3 = distTransStep :: Nil
@@ -271,7 +271,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val dupItemsTransStep = SparkSqlTransformStep(dupItemsTableName, dupItemsSql, emptyMap)
val dupItemsWriteStep = {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupItemsTableName)
RecordWriteStep(rwName, dupItemsTableName, None, writeTimestampOpt)
}
@@ -287,7 +287,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val groupDupMetricTransStep = SparkSqlTransformStep(groupDupMetricTableName, groupDupMetricSql, emptyMap)
val groupDupMetricWriteStep = {
- MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayNormalizeType, writeTimestampOpt)
+ MetricWriteStep(duplicationArrayName, groupDupMetricTableName, ArrayFlattenType, writeTimestampOpt)
}
val msteps = {
@@ -317,7 +317,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
val dupRecordWriteStep = {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
RecordWriteStep(rwName, dupRecordTableName, None, writeTimestampOpt)
}
@@ -332,7 +332,7 @@ case class DistinctnessExpr2DQSteps(context: DQContext,
}
val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
val dupMetricWriteStep = {
- MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType, writeTimestampOpt)
+ MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType, writeTimestampOpt)
}
val msteps = {
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
index 28fa96a..33d44c5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/ProfilingExpr2DQSteps.scala
@@ -19,7 +19,7 @@ under the License.
package org.apache.griffin.measure.step.builder.dsl.transform
import org.apache.commons.lang.StringUtils
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, NormalizeType, StreamingProcessType}
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, FlattenType, MetricOutputType, StreamingProcessType}
import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
import org.apache.griffin.measure.context.DQContext
import org.apache.griffin.measure.step.DQStep
@@ -92,13 +92,13 @@ case class ProfilingExpr2DQSteps(context: DQContext,
val profilingSql = {
s"SELECT ${selCondition} ${selClause} ${fromClause} ${preGroupbyClause} ${groupbyClause} ${postGroupbyClause}"
}
- val profilingName = ruleParam.name
+ val profilingName = ruleParam.outDfName
val profilingTransStep = SparkSqlTransformStep(profilingName, profilingSql, details)
val profilingMetricWriteStep = {
- val metricOpt = ruleParam.getMetricOpt
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
- val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
- MetricWriteStep(mwName, profilingName, collectType)
+ val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ MetricWriteStep(mwName, profilingName, flattenType)
}
profilingTransStep :: profilingMetricWriteStep :: Nil
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
index e9eaa06..71e9f4b 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/TimelinessExpr2DQSteps.scala
@@ -108,7 +108,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
val latencyTransStep = SparkSqlTransformStep(latencyTableName, latencySql, emptyMap, true)
// 3. timeliness metric
- val metricTableName = ruleParam.name
+ val metricTableName = ruleParam.outDfName
val totalColName = details.getStringOrKey(_total)
val avgColName = details.getStringOrKey(_avg)
val metricSql = procType match {
@@ -131,10 +131,10 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val metricTransStep = SparkSqlTransformStep(metricTableName, metricSql, emptyMap)
val metricWriteStep = {
- val metricOpt = ruleParam.getMetricOpt
- val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.name)
- val collectType = metricOpt.map(_.getCollectType).getOrElse(NormalizeType.default)
- MetricWriteStep(mwName, metricTableName, collectType)
+ val metricOpt = ruleParam.getOutputOpt(MetricOutputType)
+ val mwName = metricOpt.flatMap(_.getNameOpt).getOrElse(ruleParam.outDfName)
+ val flattenType = metricOpt.map(_.getFlatten).getOrElse(FlattenType.default)
+ MetricWriteStep(mwName, metricTableName, flattenType)
}
// current steps
@@ -150,7 +150,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val recordTransStep = SparkSqlTransformStep(recordTableName, recordSql, emptyMap)
val recordWriteStep = {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(recordTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(recordTableName)
RecordWriteStep(rwName, recordTableName, None)
}
(recordTransStep :: Nil, recordWriteStep :: Nil)
@@ -191,7 +191,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val rangeMetricTransStep = SparkSqlTransformStep(rangeMetricTableName, rangeMetricSql, emptyMap)
val rangeMetricWriteStep = {
- MetricWriteStep(stepColName, rangeMetricTableName, ArrayNormalizeType)
+ MetricWriteStep(stepColName, rangeMetricTableName, ArrayFlattenType)
}
(rangeTransStep :: rangeMetricTransStep :: Nil, rangeMetricWriteStep :: Nil)
@@ -216,7 +216,7 @@ case class TimelinessExpr2DQSteps(context: DQContext,
}
val percentileTransStep = SparkSqlTransformStep(percentileTableName, percentileSql, emptyMap)
val percentileWriteStep = {
- MetricWriteStep(percentileTableName, percentileTableName, DefaultNormalizeType)
+ MetricWriteStep(percentileTableName, percentileTableName, DefaultFlattenType)
}
(percentileTransStep :: Nil, percentileWriteStep :: Nil)
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
index 8827bf1..443239c 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/dsl/transform/UniquenessExpr2DQSteps.scala
@@ -129,7 +129,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}
}
val totalTransStep = SparkSqlTransformStep(totalTableName, totalSql, emptyMap)
- val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesNormalizeType)
+ val totalMetricWriteStep = MetricWriteStep(totalColName, totalTableName, EntriesFlattenType)
// 6. unique record
val uniqueRecordTableName = "__uniqueRecord"
@@ -151,7 +151,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}
}
val uniqueTransStep = SparkSqlTransformStep(uniqueTableName, uniqueSql, emptyMap)
- val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesNormalizeType)
+ val uniqueMetricWriteStep = MetricWriteStep(uniqueColName, uniqueTableName, EntriesFlattenType)
val transSteps1 = sourceTransStep :: targetTransStep :: joinedTransStep :: groupTransStep ::
totalTransStep :: uniqueRecordTransStep :: uniqueTransStep :: Nil
@@ -166,7 +166,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}
val dupRecordTransStep = SparkSqlTransformStep(dupRecordTableName, dupRecordSql, emptyMap, true)
val dupRecordWriteStep = {
- val rwName = ruleParam.getRecordOpt.flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
+ val rwName = ruleParam.getOutputOpt(RecordOutputType).flatMap(_.getNameOpt).getOrElse(dupRecordTableName)
RecordWriteStep(rwName, dupRecordTableName)
}
@@ -189,7 +189,7 @@ case class UniquenessExpr2DQSteps(context: DQContext,
}
val dupMetricTransStep = SparkSqlTransformStep(dupMetricTableName, dupMetricSql, emptyMap)
val dupMetricWriteStep = {
- MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayNormalizeType)
+ MetricWriteStep(duplicationArrayName, dupMetricTableName, ArrayFlattenType)
}
(dupRecordTransStep :: dupMetricTransStep :: Nil,
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
new file mode 100644
index 0000000..eac3b2b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcParamMaker.scala
@@ -0,0 +1,67 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.step.builder.preproc
+
+import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
+import org.apache.griffin.measure.configuration.enums._
+
+/**
+ * generate each entity pre-proc params by template defined in pre-proc param
+ */
+object PreProcParamMaker {
+
+ case class StringAnyMap(values:Map[String,Any])
+
+ def makePreProcRules(rules: Seq[RuleParam], suffix: String, dfName: String): (Seq[RuleParam], String) = {
+ val len = rules.size
+ val (newRules, _) = rules.zipWithIndex.foldLeft((Nil: Seq[RuleParam], dfName)) { (ret, pair) =>
+ val (rls, prevOutDfName) = ret
+ val (rule, i) = pair
+ val inName = rule.getInDfName(prevOutDfName)
+ val outName = if (i == len - 1) dfName else rule.getOutDfName(genNameWithIndex(dfName, i))
+ val ruleWithNames = rule.replaceInOutDfName(inName, outName)
+ (rls :+ makeNewPreProcRule(ruleWithNames, suffix), outName)
+ }
+ (newRules, withSuffix(dfName, suffix))
+ }
+
+ private def makeNewPreProcRule(rule: RuleParam, suffix: String): RuleParam = {
+ val newInDfName = withSuffix(rule.getInDfName(), suffix)
+ val newOutDfName = withSuffix(rule.getOutDfName(), suffix)
+ val rpRule = rule.replaceInOutDfName(newInDfName, newOutDfName)
+ rule.getDslType match {
+ case DataFrameOpsType => rpRule
+ case _ => {
+ val newRule = replaceDfNameSuffix(rule.getRule, rule.getInDfName(), suffix)
+ rpRule.replaceRule(newRule)
+ }
+ }
+ }
+
+ private def genNameWithIndex(name: String, i: Int): String = s"${name}${i}"
+
+ private def replaceDfNameSuffix(str: String, dfName: String, suffix: String): String = {
+ val regexStr = s"(?i)${dfName}"
+ val replaceDfName = withSuffix(dfName, suffix)
+ str.replaceAll(regexStr, replaceDfName)
+ }
+
+ def withSuffix(str: String, suffix: String): String = s"${str}_${suffix}"
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/23ff999c/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala b/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
deleted file mode 100644
index 64b8623..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/step/builder/preproc/PreProcRuleParamGenerator.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.step.builder.preproc
-
-import org.apache.griffin.measure.configuration.dqdefinition.RuleParam
-
-/**
- * generate rule params by template defined in pre-proc param
- */
-object PreProcRuleParamGenerator {
-
- case class StringAnyMap(values:Map[String,Any])
-
- val _name = "name"
-
- def getNewPreProcRules(rules: Seq[RuleParam], suffix: String): Seq[RuleParam] = {
- rules.map { rule =>
- getNewPreProcRule(rule, suffix)
- }
- }
-
- private def getNewPreProcRule(param: RuleParam, suffix: String): RuleParam = {
- val newName = genNewString(param.getName, suffix)
- val newRule = genNewString(param.getRule, suffix)
- val newDetails = getNewMap(param.getDetails, suffix)
- param.replaceName(newName).replaceRule(newRule).replaceDetails(newDetails)
- }
-
- private def getNewMap(details: Map[String, Any], suffix: String): Map[String, Any] = {
- val keys = details.keys
- keys.foldLeft(details) { (map, key) =>
- map.get(key) match {
- case Some(s: String) => map + (key -> genNewString(s, suffix))
- case Some(subMap: StringAnyMap) => map + (key -> getNewMap(subMap.values, suffix))
- case Some(arr: Seq[_]) => map + (key -> getNewArr(arr, suffix))
- case _ => map
- }
- }
- }
-
- private def getNewArr(paramArr: Seq[Any], suffix: String): Seq[Any] = {
- paramArr.foldLeft(Nil: Seq[Any]) { (res, param) =>
- param match {
- case s: String => res :+ genNewString(s, suffix)
- case map: StringAnyMap => res :+ getNewMap(map.values, suffix)
- case arr: Seq[_] => res :+ getNewArr(arr, suffix)
- case _ => res :+ param
- }
- }
- }
-
- private def genNewString(str: String, suffix: String): String = {
- str.replaceAll("""\$\{(.*)\}""", s"$$1_${suffix}")
- }
-
-}