You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2017/09/30 08:35:25 UTC
[10/11] incubator-griffin git commit: Dsl modify
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
index 6ee9783..2abf3e5 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/EvaluateRuleParam.scala
@@ -23,8 +23,8 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.griffin.measure.config.params.Param
@JsonInclude(Include.NON_NULL)
-case class EvaluateRuleParam( @JsonProperty("sampleRatio") sampleRatio: Double,
- @JsonProperty("rules") rules: String
+case class EvaluateRuleParam( @JsonProperty("dsl.type") dslType: String,
+ @JsonProperty("rules") rules: List[Map[String, Any]]
) extends Param {
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
index df0647c..e55d2b4 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/config/params/user/UserParam.scala
@@ -23,12 +23,10 @@ import com.fasterxml.jackson.annotation.JsonInclude.Include
import org.apache.griffin.measure.config.params.Param
@JsonInclude(Include.NON_NULL)
-case class UserParam(@JsonProperty("name") name: String,
- @JsonProperty("type") dqType: String,
- @JsonProperty("process.type") procType: String,
- @JsonProperty("source") sourceParam: DataConnectorParam,
- @JsonProperty("target") targetParam: DataConnectorParam,
- @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
+case class UserParam( @JsonProperty("name") name: String,
+ @JsonProperty("process.type") procType: String,
+ @JsonProperty("data.sources") dataSources: List[DataSourceParam],
+ @JsonProperty("evaluateRule") evaluateRuleParam: EvaluateRuleParam
) extends Param {
}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
deleted file mode 100644
index 1fb1868..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnector.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector
-
-import org.apache.griffin.measure.log.Loggable
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-
-
-trait DataConnector extends Loggable with Serializable {
-
- def available(): Boolean
-
- def init(): Unit
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
deleted file mode 100644
index 670175d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.config.params.user._
-import org.apache.griffin.measure.connector.cache._
-import org.apache.griffin.measure.connector.direct._
-import org.apache.griffin.measure.connector.streaming._
-import org.apache.griffin.measure.rule.RuleExprs
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-
-import scala.reflect.ClassTag
-import scala.util.Try
-
-object DataConnectorFactory {
-
- val HiveRegex = """^(?i)hive$""".r
- val AvroRegex = """^(?i)avro$""".r
-
- val KafkaRegex = """^(?i)kafka$""".r
-
- val TextRegex = """^(?i)text$""".r
-
- def getDirectDataConnector(sqlContext: SQLContext,
- ssc: StreamingContext,
- dataConnectorParam: DataConnectorParam,
- ruleExprs: RuleExprs,
- globalFinalCacheMap: Map[String, Any]
- ): Try[DirectDataConnector] = {
- val conType = dataConnectorParam.conType
- val version = dataConnectorParam.version
- val config = dataConnectorParam.config
- Try {
- conType match {
- case HiveRegex() => HiveDirectDataConnector(sqlContext, config, ruleExprs, globalFinalCacheMap)
- case AvroRegex() => AvroDirectDataConnector(sqlContext, config, ruleExprs, globalFinalCacheMap)
- case KafkaRegex() => {
- val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam)
- val cdcTry = getCacheDataConnector(sqlContext, dataConnectorParam.cache)
- KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam, ruleExprs, globalFinalCacheMap)
- }
- case _ => throw new Exception("connector creation error!")
- }
- }
- }
-
- private def getStreamingDataConnector(ssc: StreamingContext,
- dataConnectorParam: DataConnectorParam
- ): Try[StreamingDataConnector] = {
- val conType = dataConnectorParam.conType
- val version = dataConnectorParam.version
- val config = dataConnectorParam.config
- Try {
- conType match {
- case KafkaRegex() => {
- genKafkaDataConnector(ssc, config)
- }
- case _ => throw new Exception("streaming connector creation error!")
- }
- }
- }
-
- private def getCacheDataConnector(sqlContext: SQLContext,
- dataCacheParam: DataCacheParam
- ): Try[CacheDataConnector] = {
- if (dataCacheParam == null) {
- throw new Exception("invalid data cache param!")
- }
- val cacheType = dataCacheParam.cacheType
- Try {
- cacheType match {
- case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam)
- case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam)
- case _ => throw new Exception("cache connector creation error!")
- }
- }
- }
-
- private def genKafkaDataConnector(ssc: StreamingContext, config: Map[String, Any]) = {
- val KeyType = "key.type"
- val ValueType = "value.type"
- val keyType = config.getOrElse(KeyType, "java.lang.String").toString
- val valueType = config.getOrElse(ValueType, "java.lang.String").toString
-// val KafkaConfig = "kafka.config"
-// val Topics = "topics"
-// val kafkaConfig = config.get(KafkaConfig) match {
-// case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity)
-// case _ => Map[String, String]()
-// }
-// val topics = config.getOrElse(Topics, "").toString
- (getClassTag(keyType), getClassTag(valueType)) match {
- case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
- if (ssc == null) throw new Exception("streaming context is null! ")
- new KafkaStreamingDataConnector(ssc, config) {
- type K = String
- type KD = StringDecoder
- type V = String
- type VD = StringDecoder
- def createDStream(topicSet: Set[String]): InputDStream[(K, V)] = {
- KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
- }
- }
- }
- case _ => {
- throw new Exception("not supported type kafka data connector")
- }
- }
- }
-
- private def getClassTag(tp: String): ClassTag[_] = {
- try {
- val clazz = Class.forName(tp)
- ClassTag(clazz)
- } catch {
- case e: Throwable => throw e
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
deleted file mode 100644
index 1dfe8e2..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/CacheDataConnector.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import org.apache.griffin.measure.connector.DataConnector
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-trait CacheDataConnector extends DataConnector with DataCacheable with DataUpdatable {
-
- def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit
-
- def readData(): Try[RDD[Map[String, Any]]]
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
deleted file mode 100644
index 2be87a6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataCacheable.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-
-trait DataCacheable {
-
- protected val defCacheInfoPath = PathCounter.genPath
-
- val cacheInfoPath: String
- val readyTimeInterval: Long
- val readyTimeDelay: Long
-
- def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-
- def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
- def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
- def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
- def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
-
- protected def submitCacheTime(ms: Long): Unit = {
- val map = Map[String, String]((selfCacheTime -> ms.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- protected def submitReadyTime(ms: Long): Unit = {
- val curReadyTime = ms - readyTimeDelay
- if (curReadyTime % readyTimeInterval == 0) {
- val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
- InfoCacheInstance.cacheInfo(map)
- }
- }
-
- protected def submitLastProcTime(ms: Long): Unit = {
- val map = Map[String, String]((selfLastProcTime -> ms.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- protected def submitCleanTime(ms: Long): Unit = {
- val cleanTime = genCleanTime(ms)
- val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- protected def genCleanTime(ms: Long): Long = ms
-
- protected def readCleanTime(): Option[Long] = {
- val key = selfCleanTime
- val keys = key :: Nil
- InfoCacheInstance.readInfo(keys).get(key).flatMap { v =>
- try {
- Some(v.toLong)
- } catch {
- case _ => None
- }
- }
- }
-
-}
-
-object PathCounter {
- private val counter: AtomicLong = new AtomicLong(0L)
- def genPath(): String = s"path_${increment}"
- private def increment(): Long = {
- counter.incrementAndGet()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
deleted file mode 100644
index 07c8187..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/DataUpdatable.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import org.apache.spark.rdd.RDD
-
-trait DataUpdatable {
-
- def cleanOldData(): Unit = {}
-
- def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {}
- def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
deleted file mode 100644
index e241188..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/HiveCacheDataConnector.scala
+++ /dev/null
@@ -1,351 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.config.params.user.DataCacheParam
-import org.apache.griffin.measure.result.TimeStampInfo
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.hive.HiveContext
-
-import scala.util.{Success, Try}
-
-case class HiveCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
- ) extends CacheDataConnector {
-
- if (!sqlContext.isInstanceOf[HiveContext]) {
- throw new Exception("hive context not prepared!")
- }
-
- val config = dataCacheParam.config
- val InfoPath = "info.path"
- val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-
- val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
- val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
- val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
- val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
- case s :: e :: _ => {
- val ns = TimeUtil.milliseconds(s) match {
- case Some(n) if (n < 0) => n
- case _ => 0
- }
- val ne = TimeUtil.milliseconds(e) match {
- case Some(n) if (n < 0) => n
- case _ => 0
- }
- (ns, ne)
- }
- case _ => (0, 0)
- }
-
- val Database = "database"
- val database: String = config.getOrElse(Database, "").toString
- val TableName = "table.name"
- val tableName: String = config.get(TableName) match {
- case Some(s: String) if (s.nonEmpty) => s
- case _ => throw new Exception("invalid table.name!")
- }
- val ParentPath = "parent.path"
- val parentPath: String = config.get(ParentPath) match {
- case Some(s: String) => s
- case _ => throw new Exception("invalid parent.path!")
- }
- val tablePath = HdfsUtil.getHdfsFilePath(parentPath, tableName)
-
- val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
-
- val ReadyTimeInterval = "ready.time.interval"
- val ReadyTimeDelay = "ready.time.delay"
- val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
- val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-
- val TimeStampColumn: String = TimeStampInfo.key
- val PayloadColumn: String = "payload"
-
-// type Schema = (Long, String)
- val schema: List[(String, String)] = List(
- (TimeStampColumn, "bigint"),
- (PayloadColumn, "string")
- )
- val schemaName = schema.map(_._1)
-
-// type Partition = (Long, Long)
- val partition: List[(String, String, String)] = List(
- ("hr", "bigint", "hour"),
- ("min", "bigint", "min")
- )
- val partitionName = partition.map(_._1)
-
- private val fieldSep = """|"""
- private val rowSep = """\n"""
- private val rowSepLiteral = "\n"
-
- private def dbPrefix(): Boolean = {
- database.nonEmpty && !database.equals("default")
- }
-
- private def tableExists(): Boolean = {
- Try {
- if (dbPrefix) {
- sqlContext.tables(database).filter(tableExistsSql).collect.size
- } else {
- sqlContext.tables().filter(tableExistsSql).collect.size
- }
- } match {
- case Success(s) => s > 0
- case _ => false
- }
- }
-
- override def init(): Unit = {
- try {
- if (tableExists) {
- // drop exist table
- val dropSql = s"""DROP TABLE ${concreteTableName}"""
- sqlContext.sql(dropSql)
- }
-
- val colsSql = schema.map { field =>
- s"`${field._1}` ${field._2}"
- }.mkString(", ")
- val partitionsSql = partition.map { partition =>
- s"`${partition._1}` ${partition._2}"
- }.mkString(", ")
- val sql = s"""CREATE EXTERNAL TABLE IF NOT EXISTS ${concreteTableName}
- |(${colsSql}) PARTITIONED BY (${partitionsSql})
- |ROW FORMAT DELIMITED
- |FIELDS TERMINATED BY '${fieldSep}'
- |LINES TERMINATED BY '${rowSep}'
- |STORED AS TEXTFILE
- |LOCATION '${tablePath}'""".stripMargin
- sqlContext.sql(sql)
- } catch {
- case e: Throwable => throw e
- }
- }
-
- def available(): Boolean = {
- true
- }
-
- private def encode(data: Map[String, Any], ms: Long): Option[List[Any]] = {
- try {
- Some(schema.map { field =>
- val (name, _) = field
- name match {
- case TimeStampColumn => ms
- case PayloadColumn => JsonUtil.toJson(data)
- case _ => null
- }
- })
- } catch {
- case _ => None
- }
- }
-
- private def decode(data: List[Any], updateTimeStamp: Boolean): Option[Map[String, Any]] = {
- val dataMap = schemaName.zip(data).toMap
- dataMap.get(PayloadColumn) match {
- case Some(v: String) => {
- try {
- val map = JsonUtil.toAnyMap(v)
- val resMap = if (updateTimeStamp) {
- dataMap.get(TimeStampColumn) match {
- case Some(t) => map + (TimeStampColumn -> t)
- case _ => map
- }
- } else map
- Some(resMap)
- } catch {
- case _ => None
- }
- }
- case _ => None
- }
- }
-
- def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
- val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
- if (newCacheLocked) {
- try {
- val ptns = getPartition(ms)
- val ptnsPath = genPartitionHdfsPath(ptns)
- val dirPath = s"${tablePath}/${ptnsPath}"
- val fileName = s"${ms}"
- val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-
- // encode data
- val dataRdd: RDD[List[Any]] = rdd.flatMap(encode(_, ms))
-
- // save data
- val recordRdd: RDD[String] = dataRdd.map { dt =>
- dt.map(_.toString).mkString(fieldSep)
- }
-
- val dumped = if (!recordRdd.isEmpty) {
- HdfsFileDumpUtil.dump(filePath, recordRdd, rowSepLiteral)
- } else false
-
- // add partition
- if (dumped) {
- val sql = addPartitionSql(concreteTableName, ptns)
- sqlContext.sql(sql)
- }
-
- // submit ms
- submitCacheTime(ms)
- submitReadyTime(ms)
- } catch {
- case e: Throwable => error(s"save data error: ${e.getMessage}")
- } finally {
- newCacheLock.unlock()
- }
- }
- }
-
- def readData(): Try[RDD[Map[String, Any]]] = Try {
- val timeRange = TimeInfoCache.getTimeRange
- submitLastProcTime(timeRange._2)
-
- val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
- submitCleanTime(reviseTimeRange._1)
-
- // read directly through partition info
- val partitionRange = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
- val sql = selectSql(concreteTableName, partitionRange)
- val df = sqlContext.sql(sql)
-
- // decode data
- df.flatMap { row =>
- val dt = schemaName.map { sn =>
- row.getAs[Any](sn)
- }
- decode(dt, true)
- }
- }
-
- override def cleanOldData(): Unit = {
- val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
- if (oldCacheLocked) {
- try {
- val cleanTime = readCleanTime()
- cleanTime match {
- case Some(ct) => {
- // drop partition
- val bound = getPartition(ct)
- val sql = dropPartitionSql(concreteTableName, bound)
- sqlContext.sql(sql)
- }
- case _ => {
- // do nothing
- }
- }
- } catch {
- case e: Throwable => error(s"clean old data error: ${e.getMessage}")
- } finally {
- oldCacheLock.unlock()
- }
- }
- }
-
- override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
- // parallel process different time groups, lock is unnecessary
- val ptns = getPartition(t)
- val ptnsPath = genPartitionHdfsPath(ptns)
- val dirPath = s"${tablePath}/${ptnsPath}"
- val fileName = s"${t}"
- val filePath = HdfsUtil.getHdfsFilePath(dirPath, fileName)
-
- try {
- // remove out time old data
- HdfsFileDumpUtil.remove(dirPath, fileName, true)
-
- // save updated old data
- if (oldData.size > 0) {
- val recordDatas = oldData.flatMap { dt =>
- encode(dt, t)
- }
- val records: Iterable[String] = recordDatas.map { dt =>
- dt.map(_.toString).mkString(fieldSep)
- }
- val dumped = HdfsFileDumpUtil.dump(filePath, records, rowSepLiteral)
- }
- } catch {
- case e: Throwable => error(s"update old data error: ${e.getMessage}")
- }
- }
-
- override protected def genCleanTime(ms: Long): Long = {
- val minPartition = partition.last
- val t1 = TimeUtil.timeToUnit(ms, minPartition._3)
- val t2 = TimeUtil.timeFromUnit(t1, minPartition._3)
- t2
- }
-
- private def getPartition(ms: Long): List[(String, Any)] = {
- partition.map { p =>
- val (name, _, unit) = p
- val t = TimeUtil.timeToUnit(ms, unit)
- (name, t)
- }
- }
- private def getPartitionRange(ms1: Long, ms2: Long): List[(String, (Any, Any))] = {
- partition.map { p =>
- val (name, _, unit) = p
- val t1 = TimeUtil.timeToUnit(ms1, unit)
- val t2 = TimeUtil.timeToUnit(ms2, unit)
- (name, (t1, t2))
- }
- }
-
- private def genPartitionHdfsPath(partition: List[(String, Any)]): String = {
- partition.map(prtn => s"${prtn._1}=${prtn._2}").mkString("/")
- }
- private def addPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
- val partitionSql = partition.map(ptn => (s"`${ptn._1}` = ${ptn._2}")).mkString(", ")
- val sql = s"""ALTER TABLE ${tbn} ADD IF NOT EXISTS PARTITION (${partitionSql})"""
- sql
- }
- private def selectSql(tbn: String, partitionRange: List[(String, (Any, Any))]): String = {
- val clause = partitionRange.map { pr =>
- val (name, (r1, r2)) = pr
- s"""`${name}` BETWEEN '${r1}' and '${r2}'"""
- }.mkString(" AND ")
- val whereClause = if (clause.nonEmpty) s"WHERE ${clause}" else ""
- val sql = s"""SELECT * FROM ${tbn} ${whereClause}"""
- sql
- }
- private def dropPartitionSql(tbn: String, partition: List[(String, Any)]): String = {
- val partitionSql = partition.map(ptn => (s"PARTITION ( `${ptn._1}` < '${ptn._2}' ) ")).mkString(", ")
- val sql = s"""ALTER TABLE ${tbn} DROP ${partitionSql}"""
- println(sql)
- sql
- }
-
- private def tableExistsSql(): String = {
- s"tableName LIKE '${tableName}'"
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
deleted file mode 100644
index 62b6086..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/cache/TextCacheDataConnector.scala
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.cache.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.config.params.user.DataCacheParam
-import org.apache.griffin.measure.result.TimeStampInfo
-import org.apache.griffin.measure.utils.{HdfsFileDumpUtil, HdfsUtil, JsonUtil, TimeUtil}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.util.Try
-
-case class TextCacheDataConnector(sqlContext: SQLContext, dataCacheParam: DataCacheParam
- ) extends CacheDataConnector {
-
- val config = dataCacheParam.config
- val InfoPath = "info.path"
- val cacheInfoPath: String = config.getOrElse(InfoPath, defCacheInfoPath).toString
-
- val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
- val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
- val timeRangeParam: List[String] = if (dataCacheParam.timeRange != null) dataCacheParam.timeRange else Nil
- val deltaTimeRange: (Long, Long) = (timeRangeParam ::: List("0", "0")) match {
- case s :: e :: _ => {
- val ns = TimeUtil.milliseconds(s) match {
- case Some(n) if (n < 0) => n
- case _ => 0
- }
- val ne = TimeUtil.milliseconds(e) match {
- case Some(n) if (n < 0) => n
- case _ => 0
- }
- (ns, ne)
- }
- case _ => (0, 0)
- }
-
- val FilePath = "file.path"
- val filePath: String = config.get(FilePath) match {
- case Some(s: String) => s
- case _ => throw new Exception("invalid file.path!")
- }
-
- val ReadyTimeInterval = "ready.time.interval"
- val ReadyTimeDelay = "ready.time.delay"
- val readyTimeInterval: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeInterval, "1m").toString).getOrElse(60000L)
- val readyTimeDelay: Long = TimeUtil.milliseconds(config.getOrElse(ReadyTimeDelay, "1m").toString).getOrElse(60000L)
-
-// val TimeStampColumn: String = TimeStampInfo.key
-// val PayloadColumn: String = "payload"
-
- // cache schema: Long, String
-// val fields = List[StructField](
-// StructField(TimeStampColumn, LongType),
-// StructField(PayloadColumn, StringType)
-// )
-// val schema = StructType(fields)
-
- // case class CacheData(time: Long, payload: String) {
- // def getTime(): Long = time
- // def getPayload(): String = payload
- // }
-
- private val rowSepLiteral = "\n"
-
- val partitionUnits: List[String] = List("hour", "min")
-
- override def init(): Unit = {
- // do nothing
- }
-
- def available(): Boolean = {
- true
- }
-
- private def encode(data: Map[String, Any], ms: Long): Option[String] = {
- try {
- val map = data + (TimeStampInfo.key -> ms)
- Some(JsonUtil.toJson(map))
- } catch {
- case _: Throwable => None
- }
- }
-
- private def decode(data: String): Option[Map[String, Any]] = {
- try {
- Some(JsonUtil.toAnyMap(data))
- } catch {
- case _: Throwable => None
- }
- }
-
- def saveData(rdd: RDD[Map[String, Any]], ms: Long): Unit = {
- val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
- if (newCacheLocked) {
- try {
- val ptns = getPartition(ms)
- val ptnsPath = genPartitionHdfsPath(ptns)
- val dirPath = s"${filePath}/${ptnsPath}"
- val dataFileName = s"${ms}"
- val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
- // encode data
- val dataRdd: RDD[String] = rdd.flatMap(encode(_, ms))
-
- // save data
- val dumped = if (!dataRdd.isEmpty) {
- HdfsFileDumpUtil.dump(dataFilePath, dataRdd, rowSepLiteral)
- } else false
-
- // submit ms
- submitCacheTime(ms)
- submitReadyTime(ms)
- } catch {
- case e: Throwable => error(s"save data error: ${e.getMessage}")
- } finally {
- newCacheLock.unlock()
- }
- }
- }
-
- def readData(): Try[RDD[Map[String, Any]]] = Try {
- val timeRange = TimeInfoCache.getTimeRange
- submitLastProcTime(timeRange._2)
-
- val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
- submitCleanTime(reviseTimeRange._1)
-
- // read directly through partition info
- val partitionRanges = getPartitionRange(reviseTimeRange._1, reviseTimeRange._2)
- println(s"read time ranges: ${reviseTimeRange}")
- println(s"read partition ranges: ${partitionRanges}")
-
- // list partition paths
- val partitionPaths = listPathsBetweenRanges(filePath :: Nil, partitionRanges)
-
- if (partitionPaths.isEmpty) {
- sqlContext.sparkContext.emptyRDD[Map[String, Any]]
- } else {
- val filePaths = partitionPaths.mkString(",")
- val rdd = sqlContext.sparkContext.textFile(filePaths)
-
- // decode data
- rdd.flatMap { row =>
- decode(row)
- }
- }
- }
-
- override def cleanOldData(): Unit = {
- val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
- if (oldCacheLocked) {
- try {
- val cleanTime = readCleanTime()
- cleanTime match {
- case Some(ct) => {
- // drop partitions
- val bounds = getPartition(ct)
-
- // list partition paths
- val earlierPaths = listPathsEarlierThanBounds(filePath :: Nil, bounds)
-
- // delete out time data path
- earlierPaths.foreach { path =>
- println(s"delete hdfs path: ${path}")
- HdfsUtil.deleteHdfsPath(path)
- }
- }
- case _ => {
- // do nothing
- }
- }
- } catch {
- case e: Throwable => error(s"clean old data error: ${e.getMessage}")
- } finally {
- oldCacheLock.unlock()
- }
- }
- }
-
- override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
- // parallel process different time groups, lock is unnecessary
- val ptns = getPartition(t)
- val ptnsPath = genPartitionHdfsPath(ptns)
- val dirPath = s"${filePath}/${ptnsPath}"
- val dataFileName = s"${t}"
- val dataFilePath = HdfsUtil.getHdfsFilePath(dirPath, dataFileName)
-
- try {
- // remove out time old data
- HdfsFileDumpUtil.remove(dirPath, dataFileName, true)
-
- // save updated old data
- if (oldData.size > 0) {
- val recordDatas = oldData.flatMap { dt =>
- encode(dt, t)
- }
- val dumped = HdfsFileDumpUtil.dump(dataFilePath, recordDatas, rowSepLiteral)
- }
- } catch {
- case e: Throwable => error(s"update old data error: ${e.getMessage}")
- }
- }
-
- override protected def genCleanTime(ms: Long): Long = {
- val minPartitionUnit = partitionUnits.last
- val t1 = TimeUtil.timeToUnit(ms, minPartitionUnit)
- val t2 = TimeUtil.timeFromUnit(t1, minPartitionUnit)
- t2
- }
-
- private def getPartition(ms: Long): List[Long] = {
- partitionUnits.map { unit =>
- TimeUtil.timeToUnit(ms, unit)
- }
- }
- private def getPartitionRange(ms1: Long, ms2: Long): List[(Long, Long)] = {
- partitionUnits.map { unit =>
- val t1 = TimeUtil.timeToUnit(ms1, unit)
- val t2 = TimeUtil.timeToUnit(ms2, unit)
- (t1, t2)
- }
- }
-
- private def genPartitionHdfsPath(partition: List[Long]): String = {
- partition.map(prtn => s"${prtn}").mkString("/")
- }
-
- private def str2Long(str: String): Option[Long] = {
- try {
- Some(str.toLong)
- } catch {
- case e: Throwable => None
- }
- }
-
- // here the range means [min, max], but the best range should be (min, max]
- private def listPathsBetweenRanges(paths: List[String],
- partitionRanges: List[(Long, Long)]
- ): List[String] = {
- partitionRanges match {
- case Nil => paths
- case head :: tail => {
- val (lb, ub) = head
- val curPaths = paths.flatMap { path =>
- val names = HdfsUtil.listSubPaths(path, "dir").toList
- names.filter { name =>
- str2Long(name) match {
- case Some(t) => (t >= lb) && (t <= ub)
- case _ => false
- }
- }.map(HdfsUtil.getHdfsFilePath(path, _))
- }
- listPathsBetweenRanges(curPaths, tail)
- }
- }
- }
-
- private def listPathsEarlierThanBounds(paths: List[String], bounds: List[Long]
- ): List[String] = {
- bounds match {
- case Nil => paths
- case head :: tail => {
- val earlierPaths = paths.flatMap { path =>
- val names = HdfsUtil.listSubPaths(path, "dir").toList
- names.filter { name =>
- str2Long(name) match {
- case Some(t) => (t < head)
- case _ => false
- }
- }.map(HdfsUtil.getHdfsFilePath(path, _))
- }
- val equalPaths = paths.flatMap { path =>
- val names = HdfsUtil.listSubPaths(path, "dir").toList
- names.filter { name =>
- str2Long(name) match {
- case Some(t) => (t == head)
- case _ => false
- }
- }.map(HdfsUtil.getHdfsFilePath(path, _))
- }
-
- tail match {
- case Nil => earlierPaths
- case _ => earlierPaths ::: listPathsEarlierThanBounds(equalPaths, tail)
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
deleted file mode 100644
index b45e5a9..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/AvroDirectDataConnector.scala
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.util.Try
-
-// data connector for avro file
-case class AvroDirectDataConnector(sqlContext: SQLContext, config: Map[String, Any],
- ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
- ) extends DirectDataConnector {
-
- val FilePath = "file.path"
- val FileName = "file.name"
-
- val filePath = config.getOrElse(FilePath, "").toString
- val fileName = config.getOrElse(FileName, "").toString
-
- val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
-
- private def pathPrefix(): Boolean = {
- filePath.nonEmpty
- }
-
- private def fileExist(): Boolean = {
- HdfsUtil.existPath(concreteFileFullPath)
- }
-
- def available(): Boolean = {
- (!concreteFileFullPath.isEmpty) && fileExist
- }
-
- def init(): Unit = {}
-
- def metaData(): Try[Iterable[(String, String)]] = {
- Try {
- val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
- st.fields.map(f => (f.name, f.dataType.typeName))
- }
- }
-
- def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
- Try {
- loadDataFile.flatMap { row =>
- // generate cache data
- val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
- val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
- // data info
- val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
- try {
- (info.key -> row.getAs[info.T](info.key))
- } catch {
- case e: Throwable => info.defWrap
- }
- }.toMap
-
- finalExprValueMaps.flatMap { finalExprValueMap =>
- val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
- expr.calculate(finalExprValueMap) match {
- case Some(v) => Some(v.asInstanceOf[AnyRef])
- case _ => None
- }
- }
- val key = toTuple(groupbyData)
-
- Some((key, (finalExprValueMap, dataInfoMap)))
- }
-
-// val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-// ExprValueUtil.genExprValueMaps(Some(row), expr, cachedMap)
-// }
-// val finalExprValueMap = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMap)
-
- // when clause filter data source
-// val whenResult = ruleExprs.whenClauseExprOpt match {
-// case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-// case _ => None
-// }
-//
-// // get groupby data
-// whenResult match {
-// case Some(false) => None
-// case _ => {
-// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-// expr.calculate(finalExprValueMap) match {
-// case Some(v) => Some(v.asInstanceOf[AnyRef])
-// case _ => None
-// }
-// }
-// val key = toTuple(groupbyData)
-//
-// Some((key, finalExprValueMap))
-// }
-// }
- }
- }
- }
-
- private def loadDataFile() = {
- sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
- }
-
- private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
- if (as.size > 0) {
- val tupleClass = Class.forName("scala.Tuple" + as.size)
- tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
- } else None
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
deleted file mode 100644
index ac1a792..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/DirectDataConnector.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.connector.DataConnector
-import org.apache.griffin.measure.connector.cache.DataUpdatable
-import org.apache.spark.rdd.RDD
-
-import scala.util.Try
-
-
-trait DirectDataConnector extends DataConnector with DataUpdatable {
-
- def metaData(): Try[Iterable[(String, String)]]
-
- def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
deleted file mode 100644
index 7de2b02..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/HiveDirectDataConnector.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-
-import scala.util.{Success, Try}
-
-// data connector for hive
-case class HiveDirectDataConnector(sqlContext: SQLContext, config: Map[String, Any],
- ruleExprs: RuleExprs, constFinalExprValueMap: Map[String, Any]
- ) extends DirectDataConnector {
-
- val Database = "database"
- val TableName = "table.name"
- val Partitions = "partitions"
-
- val database = config.getOrElse(Database, "").toString
- val tableName = config.getOrElse(TableName, "").toString
- val partitionsString = config.getOrElse(Partitions, "").toString
-
- val concreteTableName = if (dbPrefix) s"${database}.${tableName}" else tableName
- val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
-
- private def dbPrefix(): Boolean = {
- database.nonEmpty && !database.equals("default")
- }
-
- def available(): Boolean = {
- (!tableName.isEmpty) && {
- Try {
- if (dbPrefix) {
- sqlContext.tables(database).filter(tableExistsSql).collect.size
- } else {
- sqlContext.tables().filter(tableExistsSql).collect.size
- }
- } match {
- case Success(s) => s > 0
- case _ => false
- }
- }
- }
-
- def init(): Unit = {}
-
- def metaData(): Try[Iterable[(String, String)]] = {
- Try {
- val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
- val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
- if (partitionPos < 0) originRows
- else originRows.take(partitionPos)
- }
- }
-
- def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
- Try {
- sqlContext.sql(dataSql).flatMap { row =>
- // generate cache data
- val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
- val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
- // data info
- val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
- try {
- (info.key -> row.getAs[info.T](info.key))
- } catch {
- case e: Throwable => info.defWrap
- }
- }.toMap
-
- finalExprValueMaps.flatMap { finalExprValueMap =>
- val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
- expr.calculate(finalExprValueMap) match {
- case Some(v) => Some(v.asInstanceOf[AnyRef])
- case _ => None
- }
- }
- val key = toTuple(groupbyData)
-
- Some((key, (finalExprValueMap, dataInfoMap)))
- }
-
- // generate cache data
-// val cacheExprValueMap: Map[String, Any] = ruleExprs.cacheExprs.foldLeft(constFinalExprValueMap) { (cachedMap, expr) =>
-// ExprValueUtil.genExprValueMap(Some(row), expr, cachedMap)
-// }
-// val finalExprValueMap = ExprValueUtil.updateExprValueMap(ruleExprs.finalCacheExprs, cacheExprValueMap)
-//
-// // when clause filter data source
-// val whenResult = ruleExprs.whenClauseExprOpt match {
-// case Some(whenClause) => whenClause.calculate(finalExprValueMap)
-// case _ => None
-// }
-//
-// // get groupby data
-// whenResult match {
-// case Some(false) => None
-// case _ => {
-// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
-// expr.calculate(finalExprValueMap) match {
-// case Some(v) => Some(v.asInstanceOf[AnyRef])
-// case _ => None
-// }
-// }
-// val key = toTuple(groupbyData)
-//
-// Some((key, finalExprValueMap))
-// }
-// }
- }
- }
- }
-
- private def tableExistsSql(): String = {
-// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
- s"tableName LIKE '${tableName}'"
- }
-
- private def metaDataSql(): String = {
- s"DESCRIBE ${concreteTableName}"
- }
-
- private def dataSql(): String = {
- val clauses = partitions.map { prtn =>
- val cls = prtn.mkString(" AND ")
- if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
- else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
- }
- clauses.mkString(" UNION ALL ")
- }
-
- private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
- if (as.size > 0) {
- val tupleClass = Class.forName("scala.Tuple" + as.size)
- tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
- } else None
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
deleted file mode 100644
index d2534cc..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/KafkaCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.config.params.user.DataConnectorParam
-import org.apache.griffin.measure.connector.DataConnectorFactory
-import org.apache.griffin.measure.connector.cache.CacheDataConnector
-import org.apache.griffin.measure.connector.streaming.StreamingDataConnector
-import org.apache.griffin.measure.result._
-import org.apache.griffin.measure.rule._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.streaming.StreamingContext
-
-import scala.util.{Failure, Success, Try}
-
-case class KafkaCacheDirectDataConnector(@transient streamingDataConnectorTry: Try[StreamingDataConnector],
- cacheDataConnectorTry: Try[CacheDataConnector],
- dataConnectorParam: DataConnectorParam,
- ruleExprs: RuleExprs,
- constFinalExprValueMap: Map[String, Any]
- ) extends StreamingCacheDirectDataConnector {
-
- val cacheDataConnector: CacheDataConnector = cacheDataConnectorTry match {
- case Success(cntr) => cntr
- case Failure(ex) => throw ex
- }
- @transient val streamingDataConnector: StreamingDataConnector = streamingDataConnectorTry match {
- case Success(cntr) => cntr
- case Failure(ex) => throw ex
- }
-
- protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
- ms: Long
- ): RDD[Map[String, Any]] = {
- val dataInfoMap = DataInfo.cacheInfoList.map(_.defWrap).toMap + TimeStampInfo.wrap(ms)
-
- rdd.flatMap { kv =>
- val msg = kv._2
-
- val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(msg), ruleExprs.cacheExprs, constFinalExprValueMap)
- val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
-
- finalExprValueMaps.map { vm =>
- vm ++ dataInfoMap
- }
- }
- }
-
- def metaData(): Try[Iterable[(String, String)]] = Try {
- Map.empty[String, String]
- }
-
- def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = Try {
- cacheDataConnector.readData match {
- case Success(rdd) => {
- rdd.flatMap { row =>
- val finalExprValueMap = ruleExprs.finalCacheExprs.flatMap { expr =>
- row.get(expr._id).flatMap { d =>
- Some((expr._id, d))
- }
- }.toMap
-
- val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
- row.get(info.key) match {
- case Some(d) => (info.key -> d)
- case _ => info.defWrap
- }
- }.toMap
-
- val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
- expr.calculate(finalExprValueMap) match {
- case Some(v) => Some(v.asInstanceOf[AnyRef])
- case _ => None
- }
- }
- val key = toTuple(groupbyData)
-
- Some((key, (finalExprValueMap, dataInfoMap)))
- }
- }
- case Failure(ex) => throw ex
- }
- }
-
- override def cleanOldData(): Unit = {
- cacheDataConnector.cleanOldData
- }
-
- override def updateOldData(t: Long, oldData: Iterable[Map[String, Any]]): Unit = {
- if (dataConnectorParam.getMatchOnce) {
- cacheDataConnector.updateOldData(t, oldData)
- }
- }
-
- override def updateAllOldData(oldRdd: RDD[Map[String, Any]]): Unit = {
- if (dataConnectorParam.getMatchOnce) {
- cacheDataConnector.updateAllOldData(oldRdd)
- }
- }
-
- private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
- if (as.size > 0) {
- val tupleClass = Class.forName("scala.Tuple" + as.size)
- tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
- } else None
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
deleted file mode 100644
index 87139d6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/direct/StreamingCacheDirectDataConnector.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.direct
-
-import org.apache.griffin.measure.connector.cache.CacheDataConnector
-import org.apache.griffin.measure.connector.streaming.StreamingDataConnector
-import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.ExprValueUtil
-import org.apache.spark.rdd.RDD
-
-import scala.util.{Failure, Success}
-
-trait StreamingCacheDirectDataConnector extends DirectDataConnector {
-
- val cacheDataConnector: CacheDataConnector
- @transient val streamingDataConnector: StreamingDataConnector
-
- def available(): Boolean = {
- cacheDataConnector.available && streamingDataConnector.available
- }
-
- def init(): Unit = {
- cacheDataConnector.init
-
- val ds = streamingDataConnector.stream match {
- case Success(dstream) => dstream
- case Failure(ex) => throw ex
- }
-
- ds.foreachRDD((rdd, time) => {
- val ms = time.milliseconds
-
- val valueMapRdd = transform(rdd, ms)
-
- // save data frame
- cacheDataConnector.saveData(valueMapRdd, ms)
- })
- }
-
- protected def transform(rdd: RDD[(streamingDataConnector.K, streamingDataConnector.V)],
- ms: Long
- ): RDD[Map[String, Any]]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
deleted file mode 100644
index fdd511d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/KafkaStreamingDataConnector.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.streaming
-
-import kafka.serializer.Decoder
-import org.apache.griffin.measure.connector.cache.{CacheDataConnector, DataCacheable}
-import org.apache.griffin.measure.result.{DataInfo, TimeStampInfo}
-import org.apache.griffin.measure.rule.{ExprValueUtil, RuleExprs}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.{Failure, Success, Try}
-
-abstract class KafkaStreamingDataConnector(@transient ssc: StreamingContext,
- config: Map[String, Any]
- ) extends StreamingDataConnector {
- type KD <: Decoder[K]
- type VD <: Decoder[V]
-
- val KafkaConfig = "kafka.config"
- val Topics = "topics"
-
- val kafkaConfig = config.get(KafkaConfig) match {
- case Some(map: Map[String, Any]) => map.mapValues(_.toString).map(identity)
- case _ => Map[String, String]()
- }
- val topics = config.getOrElse(Topics, "").toString
-
- def available(): Boolean = {
- true
- }
-
- def init(): Unit = {}
-
- def stream(): Try[InputDStream[(K, V)]] = Try {
- val topicSet = topics.split(",").toSet
- createDStream(topicSet)
- }
-
- protected def createDStream(topicSet: Set[String]): InputDStream[(K, V)]
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
deleted file mode 100644
index c37caac..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/connector/streaming/StreamingDataConnector.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.connector.streaming
-
-import org.apache.griffin.measure.connector.DataConnector
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.Try
-
-
-trait StreamingDataConnector extends DataConnector {
-
- type K
- type V
-
- def stream(): Try[InputDStream[(K, V)]]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
new file mode 100644
index 0000000..534fb1b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnector.scala
@@ -0,0 +1,114 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.log.Loggable
+import org.apache.griffin.measure.process.engine._
+import org.apache.griffin.measure.rule.adaptor.{PreProcPhase, RuleAdaptorGroup, RunPhase}
+import org.apache.griffin.measure.rule.dsl._
+import org.apache.griffin.measure.rule.preproc.PreProcRuleGenerator
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+
+trait DataConnector extends Loggable with Serializable {
+
+// def available(): Boolean
+
+ def init(): Unit
+
+ def data(ms: Long): Option[DataFrame]
+
+ val dqEngines: DqEngines
+
+ val dcParam: DataConnectorParam
+
+ val sqlContext: SQLContext
+
+ val id: String = DataConnectorIdGenerator.genId
+
+ protected def suffix(ms: Long): String = s"${id}_${ms}"
+ protected def thisName(ms: Long): String = s"this_${suffix(ms)}"
+
+ final val tmstColName = GroupByColumn.tmst
+
+ def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
+ val thisTable = thisName(ms)
+ val preProcRules = PreProcRuleGenerator.genPreProcRules(dcParam.preProc, suffix(ms))
+ val names = PreProcRuleGenerator.getRuleNames(preProcRules).toSet + thisTable
+
+ try {
+ dfOpt.flatMap { df =>
+ // in data
+ df.registerTempTable(thisTable)
+
+ // generate rule steps
+ val ruleSteps = RuleAdaptorGroup.genConcreteRuleSteps(preProcRules, DslType("spark-sql"), PreProcPhase)
+
+ // run rules
+ dqEngines.runRuleSteps(ruleSteps)
+
+ // out data
+ val outDf = sqlContext.table(thisTable)
+
+ // drop temp table
+ names.foreach { name =>
+ try {
+ sqlContext.dropTempTable(name)
+ } catch {
+ case e: Throwable => warn(s"drop temp table ${name} fails")
+ }
+ }
+
+ // add tmst
+ val withTmstDf = outDf.withColumn(tmstColName, lit(ms))
+
+ Some(withTmstDf)
+ }
+ } catch {
+ case e: Throwable => {
+ error(s"preporcess of data connector [${id}] error: ${e.getMessage}")
+ None
+ }
+ }
+
+ }
+
+}
+
+object DataConnectorIdGenerator {
+ private val counter: AtomicLong = new AtomicLong(0L)
+ private val head: String = "dc"
+
+ def genId: String = {
+ s"${head}${increment}"
+ }
+
+ private def increment: Long = {
+ counter.incrementAndGet()
+ }
+}
+
+object GroupByColumn {
+ val tmst = "__tmst"
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..9c3383f
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/DataConnectorFactory.scala
@@ -0,0 +1,150 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.config.params.user._
+import org.apache.griffin.measure.data.connector.streaming.{KafkaStreamingDataConnector, KafkaStreamingStringDataConnector, StreamingDataConnector}
+import org.apache.griffin.measure.process.engine.{DqEngine, DqEngines}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.DataFrame
+
+import scala.util.Success
+//import org.apache.griffin.measure.data.connector.cache._
+import org.apache.griffin.measure.data.connector.batch._
+//import org.apache.griffin.measure.data.connector.streaming._
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+object DataConnectorFactory {
+
+ val HiveRegex = """^(?i)hive$""".r
+ val AvroRegex = """^(?i)avro$""".r
+ val TextDirRegex = """^(?i)text-dir$""".r
+
+ val KafkaRegex = """^(?i)kafka$""".r
+
+ val TextRegex = """^(?i)text$""".r
+
+ def getDataConnector(sqlContext: SQLContext,
+ @transient ssc: StreamingContext,
+ dqEngines: DqEngines,
+ dataConnectorParam: DataConnectorParam
+ ): Try[DataConnector] = {
+ val conType = dataConnectorParam.conType
+ val version = dataConnectorParam.version
+ val config = dataConnectorParam.config
+ Try {
+ conType match {
+ case HiveRegex() => HiveBatchDataConnector(sqlContext, dqEngines, dataConnectorParam)
+ case AvroRegex() => AvroBatchDataConnector(sqlContext, dqEngines, dataConnectorParam)
+ case TextDirRegex() => TextDirBatchDataConnector(sqlContext, dqEngines, dataConnectorParam)
+ case KafkaRegex() => {
+// val ksdcTry = getStreamingDataConnector(ssc, dataConnectorParam)
+// val cdcTry = getCacheDataConnector(sqlContext, dataConnectorParam.cache)
+// KafkaCacheDirectDataConnector(ksdcTry, cdcTry, dataConnectorParam)
+ getStreamingDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam)
+ }
+ case _ => throw new Exception("connector creation error!")
+ }
+ }
+ }
+
+ private def getStreamingDataConnector(sqlContext: SQLContext,
+ @transient ssc: StreamingContext,
+ dqEngines: DqEngines,
+ dataConnectorParam: DataConnectorParam
+ ): StreamingDataConnector = {
+ if (ssc == null) throw new Exception("streaming context is null!")
+ val conType = dataConnectorParam.conType
+ val version = dataConnectorParam.version
+ conType match {
+ case KafkaRegex() => genKafkaDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam)
+ case _ => throw new Exception("streaming connector creation error!")
+ }
+ }
+//
+// private def getCacheDataConnector(sqlContext: SQLContext,
+// dataCacheParam: DataCacheParam
+// ): Try[CacheDataConnector] = {
+// if (dataCacheParam == null) {
+// throw new Exception("invalid data cache param!")
+// }
+// val cacheType = dataCacheParam.cacheType
+// Try {
+// cacheType match {
+// case HiveRegex() => HiveCacheDataConnector(sqlContext, dataCacheParam)
+// case TextRegex() => TextCacheDataConnector(sqlContext, dataCacheParam)
+// case _ => throw new Exception("cache connector creation error!")
+// }
+// }
+// }
+//
+ private def genKafkaDataConnector(sqlContext: SQLContext,
+ @transient ssc: StreamingContext,
+ dqEngines: DqEngines,
+ dataConnectorParam: DataConnectorParam
+ ) = {
+ val config = dataConnectorParam.config
+ val KeyType = "key.type"
+ val ValueType = "value.type"
+ val keyType = config.getOrElse(KeyType, "java.lang.String").toString
+ val valueType = config.getOrElse(ValueType, "java.lang.String").toString
+ (getClassTag(keyType), getClassTag(valueType)) match {
+ case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+ KafkaStreamingStringDataConnector(sqlContext, ssc, dqEngines, dataConnectorParam)
+ }
+ case _ => {
+ throw new Exception("not supported type kafka data connector")
+ }
+ }
+ }
+
+ private def getClassTag(tp: String): ClassTag[_] = {
+ try {
+ val clazz = Class.forName(tp)
+ ClassTag(clazz)
+ } catch {
+ case e: Throwable => throw e
+ }
+ }
+
+ def filterBatchDataConnectors(connectors: Seq[DataConnector]): Seq[BatchDataConnector] = {
+ connectors.flatMap { dc =>
+ dc match {
+ case mdc: BatchDataConnector => Some(mdc)
+ case _ => None
+ }
+ }
+ }
+ def filterStreamingDataConnectors(connectors: Seq[DataConnector]): Seq[StreamingDataConnector] = {
+ connectors.flatMap { dc =>
+ dc match {
+ case mdc: StreamingDataConnector => Some(mdc)
+ case _ => None
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
new file mode 100644
index 0000000..ccd6441
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/AvroBatchDataConnector.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.result._
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+import scala.util.Try
+
+// data connector for avro file
+case class AvroBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam
+ ) extends BatchDataConnector {
+
+ val config = dcParam.config
+
+ val FilePath = "file.path"
+ val FileName = "file.name"
+
+ val filePath = config.getString(FilePath, "")
+ val fileName = config.getString(FileName, "")
+
+ val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+
+ private def pathPrefix(): Boolean = {
+ filePath.nonEmpty
+ }
+
+ private def fileExist(): Boolean = {
+ HdfsUtil.existPath(concreteFileFullPath)
+ }
+
+ def data(ms: Long): Option[DataFrame] = {
+ try {
+ val df = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable => {
+ error(s"load avro file ${concreteFileFullPath} fails")
+ None
+ }
+ }
+ }
+
+// def available(): Boolean = {
+// (!concreteFileFullPath.isEmpty) && fileExist
+// }
+
+// def init(): Unit = {}
+
+// def metaData(): Try[Iterable[(String, String)]] = {
+// Try {
+// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+// st.fields.map(f => (f.name, f.dataType.typeName))
+// }
+// }
+
+// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+// Try {
+// loadDataFile.flatMap { row =>
+// // generate cache data
+// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
+// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+//
+// // data info
+// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
+// try {
+// (info.key -> row.getAs[info.T](info.key))
+// } catch {
+// case e: Throwable => info.defWrap
+// }
+// }.toMap
+//
+// finalExprValueMaps.flatMap { finalExprValueMap =>
+// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+// expr.calculate(finalExprValueMap) match {
+// case Some(v) => Some(v.asInstanceOf[AnyRef])
+// case _ => None
+// }
+// }
+// val key = toTuple(groupbyData)
+//
+// Some((key, (finalExprValueMap, dataInfoMap)))
+// }
+// }
+// }
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
new file mode 100644
index 0000000..4d138ab
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/BatchDataConnector.scala
@@ -0,0 +1,35 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.data.connector._
+//import org.apache.griffin.measure.data.connector.cache.DataUpdatable
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.types.StructType
+
+import scala.util.{Failure, Success, Try}
+
+
+trait BatchDataConnector extends DataConnector {
+
+// def metaData(): Option[StructType]
+
+ def init(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/4aa6f779/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
new file mode 100644
index 0000000..5d80d0e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/data/connector/batch/HiveBatchDataConnector.scala
@@ -0,0 +1,149 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.data.connector.batch
+
+import org.apache.griffin.measure.config.params.user.DataConnectorParam
+import org.apache.griffin.measure.data.connector._
+import org.apache.griffin.measure.process.engine.DqEngines
+import org.apache.griffin.measure.result._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
+
+import scala.util.{Success, Try}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+// data connector for hive
+case class HiveBatchDataConnector(sqlContext: SQLContext, dqEngines: DqEngines, dcParam: DataConnectorParam
+ ) extends BatchDataConnector {
+
+ val config = dcParam.config
+
+ if (!sqlContext.isInstanceOf[HiveContext]) {
+ throw new Exception("hive context not prepared!")
+ }
+
+ val Database = "database"
+ val TableName = "table.name"
+ val Partitions = "partitions"
+
+ val database = config.getString(Database, "default")
+ val tableName = config.getString(TableName, "")
+ val partitionsString = config.getString(Partitions, "")
+
+ val concreteTableName = s"${database}.${tableName}"
+ val partitions = partitionsString.split(";").map(s => s.split(",").map(_.trim))
+
+ def data(ms: Long): Option[DataFrame] = {
+ try {
+ val df = sqlContext.sql(dataSql)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable => {
+ error(s"load hive table ${concreteTableName} fails")
+ None
+ }
+ }
+ }
+
+// def available(): Boolean = {
+// (!tableName.isEmpty) && {
+// Try {
+// if (dbPrefix) {
+// sqlContext.tables(database).filter(tableExistsSql).collect.size
+// } else {
+// sqlContext.tables().filter(tableExistsSql).collect.size
+// }
+// } match {
+// case Success(s) => s > 0
+// case _ => false
+// }
+// }
+// }
+
+// def init(): Unit = {}
+
+// def metaData(): Try[Iterable[(String, String)]] = {
+// Try {
+// val originRows = sqlContext.sql(metaDataSql).map(r => (r.getString(0), r.getString(1))).collect
+// val partitionPos: Int = originRows.indexWhere(pair => pair._1.startsWith("# "))
+// if (partitionPos < 0) originRows
+// else originRows.take(partitionPos)
+// }
+// }
+
+// def data(): Try[RDD[(Product, (Map[String, Any], Map[String, Any]))]] = {
+// Try {
+// sqlContext.sql(dataSql).flatMap { row =>
+// // generate cache data
+// val cacheExprValueMaps = ExprValueUtil.genExprValueMaps(Some(row), ruleExprs.cacheExprs, constFinalExprValueMap)
+// val finalExprValueMaps = ExprValueUtil.updateExprValueMaps(ruleExprs.finalCacheExprs, cacheExprValueMaps)
+//
+// // data info
+// val dataInfoMap: Map[String, Any] = DataInfo.cacheInfoList.map { info =>
+// try {
+// (info.key -> row.getAs[info.T](info.key))
+// } catch {
+// case e: Throwable => info.defWrap
+// }
+// }.toMap
+//
+// finalExprValueMaps.flatMap { finalExprValueMap =>
+// val groupbyData: Seq[AnyRef] = ruleExprs.groupbyExprs.flatMap { expr =>
+// expr.calculate(finalExprValueMap) match {
+// case Some(v) => Some(v.asInstanceOf[AnyRef])
+// case _ => None
+// }
+// }
+// val key = toTuple(groupbyData)
+//
+// Some((key, (finalExprValueMap, dataInfoMap)))
+// }
+// }
+// }
+// }
+
+ private def tableExistsSql(): String = {
+// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
+ s"tableName LIKE '${tableName}'"
+ }
+
+ private def metaDataSql(): String = {
+ s"DESCRIBE ${concreteTableName}"
+ }
+
+ private def dataSql(): String = {
+ val clauses = partitions.map { prtn =>
+ val cls = prtn.mkString(" AND ")
+ if (cls.isEmpty) s"SELECT * FROM ${concreteTableName}"
+ else s"SELECT * FROM ${concreteTableName} WHERE ${cls}"
+ }
+ clauses.mkString(" UNION ALL ")
+ }
+
+// private def toTuple[A <: AnyRef](as: Seq[A]): Product = {
+// if (as.size > 0) {
+// val tupleClass = Class.forName("scala.Tuple" + as.size)
+// tupleClass.getConstructors.apply(0).newInstance(as: _*).asInstanceOf[Product]
+// } else None
+// }
+
+}