You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by gu...@apache.org on 2018/06/14 04:52:51 UTC
[1/3] incubator-griffin git commit: refactor context and data source
Repository: incubator-griffin
Updated Branches:
refs/heads/master d63f6f4ae -> 6b389b316
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
new file mode 100644
index 0000000..f5439fc
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/AvroBatchDataConnector.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * batch data connector for avro file
+ */
+case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage
+ ) extends BatchDataConnector {
+
+ val config = dcParam.getConfig
+
+ val FilePath = "file.path"
+ val FileName = "file.name"
+
+ val filePath = config.getString(FilePath, "")
+ val fileName = config.getString(FileName, "")
+
+ val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
+
+ private def pathPrefix(): Boolean = {
+ filePath.nonEmpty
+ }
+
+ private def fileExist(): Boolean = {
+ HdfsUtil.existPath(concreteFileFullPath)
+ }
+
+ def data(ms: Long): (Option[DataFrame], TimeRange) = {
+ val dfOpt = try {
+ val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable => {
+ error(s"load avro file ${concreteFileFullPath} fails")
+ None
+ }
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala
new file mode 100644
index 0000000..8246f3b
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/BatchDataConnector.scala
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.datasource.connector.DataConnector
+
+trait BatchDataConnector extends DataConnector {
+
+ def init(): Unit = {}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
new file mode 100644
index 0000000..f6533b5
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/HiveBatchDataConnector.scala
@@ -0,0 +1,86 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * batch data connector for hive table
+ */
+case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage
+ ) extends BatchDataConnector {
+
+ val config = dcParam.getConfig
+
+ val Database = "database"
+ val TableName = "table.name"
+ val Where = "where"
+
+ val database = config.getString(Database, "default")
+ val tableName = config.getString(TableName, "")
+ val whereString = config.getString(Where, "")
+
+ val concreteTableName = s"${database}.${tableName}"
+ val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
+
+ def data(ms: Long): (Option[DataFrame], TimeRange) = {
+ val dfOpt = try {
+ val dtSql = dataSql
+ info(dtSql)
+ val df = sparkSession.sql(dtSql)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } catch {
+ case e: Throwable => {
+ error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
+ None
+ }
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+
+ private def tableExistsSql(): String = {
+// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
+ s"tableName LIKE '${tableName}'"
+ }
+
+ private def metaDataSql(): String = {
+ s"DESCRIBE ${concreteTableName}"
+ }
+
+ private def dataSql(): String = {
+ val tableClause = s"SELECT * FROM ${concreteTableName}"
+ if (wheres.length > 0) {
+ val clauses = wheres.map { w =>
+ s"${tableClause} WHERE ${w}"
+ }
+ clauses.mkString(" UNION ALL ")
+ } else tableClause
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
new file mode 100644
index 0000000..fd83c1e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/TextDirBatchDataConnector.scala
@@ -0,0 +1,106 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.batch
+
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.HdfsUtil
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * batch data connector for directory with text format data in the nth depth sub-directories
+ */
+case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage
+ ) extends BatchDataConnector {
+
+ val config = dcParam.getConfig
+
+ val DirPath = "dir.path"
+ val DataDirDepth = "data.dir.depth"
+ val SuccessFile = "success.file"
+ val DoneFile = "done.file"
+
+ val dirPath = config.getString(DirPath, "")
+ val dataDirDepth = config.getInt(DataDirDepth, 0)
+ val successFile = config.getString(SuccessFile, "_SUCCESS")
+ val doneFile = config.getString(DoneFile, "_DONE")
+
+ val ignoreFilePrefix = "_"
+
+ private def dirExist(): Boolean = {
+ HdfsUtil.existPath(dirPath)
+ }
+
+ def data(ms: Long): (Option[DataFrame], TimeRange) = {
+ val dfOpt = try {
+ val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
+ // touch done file for read dirs
+ dataDirs.foreach(dir => touchDone(dir))
+
+ val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
+
+ if (validDataDirs.nonEmpty) {
+ val df = sparkSession.read.text(validDataDirs: _*)
+ val dfOpt = Some(df)
+ val preDfOpt = preProcess(dfOpt, ms)
+ preDfOpt
+ } else {
+ None
+ }
+ } catch {
+ case e: Throwable => {
+ error(s"load text dir ${dirPath} fails: ${e.getMessage}")
+ None
+ }
+ }
+ val tmsts = readTmst(ms)
+ (dfOpt, TimeRange(ms, tmsts))
+ }
+
+ private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
+ val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
+ if (depth <= 0) {
+ subDirs.filter(filteFunc)
+ } else {
+ listSubDirs(subDirs, depth - 1, filteFunc)
+ }
+ }
+
+ private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
+ private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
+ private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
+
+ private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
+
+ private def emptyDir(dir: String): Boolean = {
+ HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
+ }
+
+// def metaData(): Try[Iterable[(String, String)]] = {
+// Try {
+// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
+// st.fields.map(f => (f.name, f.dataType.typeName))
+// }
+// }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
new file mode 100644
index 0000000..1475898
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingDataConnector.scala
@@ -0,0 +1,85 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import kafka.serializer.Decoder
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.{Failure, Success, Try}
+import org.apache.griffin.measure.utils.ParamUtil._
+
+/**
+ * streaming data connector for kafka
+ */
+trait KafkaStreamingDataConnector extends StreamingDataConnector {
+
+ type KD <: Decoder[K]
+ type VD <: Decoder[V]
+ type OUT = (K, V)
+
+ val config = dcParam.getConfig
+
+ val KafkaConfig = "kafka.config"
+ val Topics = "topics"
+
+ val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
+ val topics = config.getString(Topics, "")
+
+ def init(): Unit = {
+ // register fan in
+ streamingCacheClientOpt.foreach(_.registerFanIn)
+
+ val ds = stream match {
+ case Success(dstream) => dstream
+ case Failure(ex) => throw ex
+ }
+ ds.foreachRDD((rdd, time) => {
+ val ms = time.milliseconds
+ val saveDfOpt = try {
+ // coalesce partition number
+ val prlCount = rdd.sparkContext.defaultParallelism
+ val ptnCount = rdd.getNumPartitions
+ val repartitionedRdd = if (prlCount < ptnCount) {
+ rdd.coalesce(prlCount)
+ } else rdd
+
+ val dfOpt = transform(repartitionedRdd)
+
+ // pre-process
+ preProcess(dfOpt, ms)
+ } catch {
+ case e: Throwable => {
+ error(s"streaming data connector error: ${e.getMessage}")
+ None
+ }
+ }
+
+ // save data frame
+ streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
+ })
+ }
+
+ def stream(): Try[InputDStream[OUT]] = Try {
+ val topicSet = topics.split(",").toSet
+ createDStream(topicSet)
+ }
+
+ protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
new file mode 100644
index 0000000..c24aadc
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
@@ -0,0 +1,71 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import kafka.serializer.StringDecoder
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, _}
+import org.apache.spark.streaming.StreamingContext
+import org.apache.spark.streaming.dstream.InputDStream
+import org.apache.spark.streaming.kafka.KafkaUtils
+
+/**
+ * streaming data connector for kafka with string format key and value
+ */
+case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSession,
+ @transient ssc: StreamingContext,
+ dcParam: DataConnectorParam,
+ timestampStorage: TimestampStorage,
+ streamingCacheClientOpt: Option[StreamingCacheClient]
+ ) extends KafkaStreamingDataConnector {
+
+ type K = String
+ type KD = StringDecoder
+ type V = String
+ type VD = StringDecoder
+
+ val valueColName = "value"
+ val schema = StructType(Array(
+ StructField(valueColName, StringType)
+ ))
+
+ def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
+ KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
+ }
+
+ def transform(rdd: RDD[OUT]): Option[DataFrame] = {
+ if (rdd.isEmpty) None else {
+ try {
+ val rowRdd = rdd.map(d => Row(d._2))
+ val df = sparkSession.createDataFrame(rowRdd, schema)
+ Some(df)
+ } catch {
+ case e: Throwable => {
+ error(s"streaming data transform fails")
+ None
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
new file mode 100644
index 0000000..5c2170c
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/streaming/StreamingDataConnector.scala
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector.streaming
+
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.DataConnector
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.streaming.dstream.InputDStream
+
+import scala.util.Try
+
+trait StreamingDataConnector extends DataConnector {
+
+ type K
+ type V
+ type OUT
+
+ protected def stream(): Try[InputDStream[OUT]]
+
+ // transform rdd to dataframe
+ def transform(rdd: RDD[OUT]): Option[DataFrame]
+
+ // streaming data connector cannot directly read data frame
+ def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
+
+ val streamingCacheClientOpt: Option[StreamingCacheClient]
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
index 06892a3..1174dbb 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/batch/BatchDQApp.scala
@@ -23,7 +23,7 @@ import java.util.Date
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.params._
import org.apache.griffin.measure.context._
-import org.apache.griffin.measure.context.datasource.DataSourceFactory
+import org.apache.griffin.measure.datasource.DataSourceFactory
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
import org.apache.griffin.measure.step.builder.udf.GriffinUDFAgent
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
index f990c8e..73fc7a0 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/launch/streaming/StreamingDQApp.scala
@@ -25,8 +25,8 @@ import org.apache.griffin.measure.Loggable
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.params._
import org.apache.griffin.measure.context._
-import org.apache.griffin.measure.context.datasource.DataSourceFactory
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
+import org.apache.griffin.measure.datasource.DataSourceFactory
+import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
import org.apache.griffin.measure.context.streaming.metric.CacheResults
import org.apache.griffin.measure.job.builder.DQJobBuilder
import org.apache.griffin.measure.launch.DQApp
@@ -68,8 +68,8 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
clearCpDir
// init info cache instance
- InfoCacheInstance.initInstance(envParam.infoCacheParams, metricName)
- InfoCacheInstance.init
+ OffsetCacheClient.initClient(envParam.offsetCacheParams, metricName)
+ OffsetCacheClient.init
// register udf
GriffinUDFAgent.register(sqlContext)
@@ -163,7 +163,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
evaluateRuleParam: EvaluateRuleParam
) extends Runnable with Loggable {
- val lock = InfoCacheInstance.genLock("process")
+ val lock = OffsetCacheClient.genLock("process")
val appPersist = globalContext.getPersist()
def run(): Unit = {
@@ -174,7 +174,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
if (locked) {
try {
- TimeInfoCache.startTimeInfoCache
+ OffsetCacheClient.startOffsetCache
val startTime = new Date().getTime
appPersist.log(startTime, s"starting process ...")
@@ -196,7 +196,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
val endTime = new Date().getTime
appPersist.log(endTime, s"process using time: ${endTime - startTime} ms")
- TimeInfoCache.endTimeInfoCache
+ OffsetCacheClient.endOffsetCache
// clean old data
cleanData(dqContext)
@@ -225,7 +225,7 @@ case class StreamingDQApp(allParam: GriffinConfig) extends DQApp {
context.clean()
- val cleanTime = TimeInfoCache.getCleanTime
+ val cleanTime = OffsetCacheClient.getCleanTime
CacheResults.refresh(cleanTime)
} catch {
case e: Throwable => error(s"clean data error: ${e.getMessage}")
[3/3] incubator-griffin git commit: refactor context and data source
Posted by gu...@apache.org.
refactor context and data source
Author: Lionel Liu <bh...@163.com>
Author: dodobel <12...@qq.com>
Closes #300 from bhlx3lyx7/spark2.
Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/6b389b31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/6b389b31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/6b389b31
Branch: refs/heads/master
Commit: 6b389b316061e55c19ccb171615fd7e3689f59ce
Parents: d63f6f4
Author: Lionel Liu <bh...@163.com>
Authored: Thu Jun 14 12:52:44 2018 +0800
Committer: Lionel Liu <bh...@163.com>
Committed: Thu Jun 14 12:52:44 2018 +0800
----------------------------------------------------------------------
.../configuration/params/EnvConfig.scala | 18 +-
.../griffin/measure/context/DQContext.scala | 2 +-
.../measure/context/datasource/DataSource.scala | 97 -----
.../context/datasource/DataSourceFactory.scala | 67 ----
.../datasource/cache/StreamingCacheClient.scala | 366 -------------------
.../cache/StreamingCacheClientFactory.scala | 68 ----
.../cache/StreamingCacheJsonClient.scala | 40 --
.../cache/StreamingCacheOrcClient.scala | 40 --
.../cache/StreamingCacheParquetClient.scala | 42 ---
.../context/datasource/cache/WithFanIn.scala | 69 ----
.../datasource/connector/DataConnector.scala | 112 ------
.../connector/DataConnectorFactory.scala | 125 -------
.../batch/AvroBatchDataConnector.scala | 71 ----
.../connector/batch/BatchDataConnector.scala | 27 --
.../batch/HiveBatchDataConnector.scala | 86 -----
.../batch/TextDirBatchDataConnector.scala | 106 ------
.../streaming/KafkaStreamingDataConnector.scala | 85 -----
.../KafkaStreamingStringDataConnector.scala | 71 ----
.../streaming/StreamingDataConnector.scala | 46 ---
.../datasource/info/DataSourceCacheable.scala | 88 -----
.../context/datasource/info/TmstCache.scala | 47 ---
.../context/streaming/info/InfoCache.scala | 39 --
.../streaming/info/InfoCacheFactory.scala | 41 ---
.../streaming/info/InfoCacheInstance.scala | 53 ---
.../context/streaming/info/TimeInfoCache.scala | 127 -------
.../context/streaming/info/ZKInfoCache.scala | 217 -----------
.../context/streaming/lock/CacheLockInZK.scala | 53 +++
.../context/streaming/lock/CacheLockSeq.scala | 33 ++
.../context/streaming/lock/MultiCacheLock.scala | 39 --
.../context/streaming/lock/ZKCacheLock.scala | 53 ---
.../context/streaming/offset/OffsetCache.scala | 39 ++
.../streaming/offset/OffsetCacheClient.scala | 54 +++
.../streaming/offset/OffsetCacheFactory.scala | 39 ++
.../streaming/offset/OffsetCacheInZK.scala | 214 +++++++++++
.../context/streaming/offset/OffsetOps.scala | 125 +++++++
.../griffin/measure/datasource/DataSource.scala | 96 +++++
.../measure/datasource/DataSourceFactory.scala | 66 ++++
.../measure/datasource/TimestampStorage.scala | 47 +++
.../datasource/cache/StreamingCacheClient.scala | 366 +++++++++++++++++++
.../cache/StreamingCacheClientFactory.scala | 68 ++++
.../cache/StreamingCacheJsonClient.scala | 40 ++
.../cache/StreamingCacheOrcClient.scala | 40 ++
.../cache/StreamingCacheParquetClient.scala | 42 +++
.../cache/StreamingOffsetCacheable.scala | 88 +++++
.../measure/datasource/cache/WithFanIn.scala | 69 ++++
.../datasource/connector/DataConnector.scala | 112 ++++++
.../connector/DataConnectorFactory.scala | 125 +++++++
.../batch/AvroBatchDataConnector.scala | 71 ++++
.../connector/batch/BatchDataConnector.scala | 27 ++
.../batch/HiveBatchDataConnector.scala | 86 +++++
.../batch/TextDirBatchDataConnector.scala | 106 ++++++
.../streaming/KafkaStreamingDataConnector.scala | 85 +++++
.../KafkaStreamingStringDataConnector.scala | 71 ++++
.../streaming/StreamingDataConnector.scala | 46 +++
.../measure/launch/batch/BatchDQApp.scala | 2 +-
.../launch/streaming/StreamingDQApp.scala | 16 +-
56 files changed, 2227 insertions(+), 2241 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
index bc6d50f..fc4b984 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/configuration/params/EnvConfig.scala
@@ -27,22 +27,22 @@ import org.apache.griffin.measure.utils.TimeUtil
* environment param
* @param sparkParam config of spark environment (must)
* @param persistParams config of persist ways (optional)
- * @param infoCacheParams config of information cache ways (required in streaming mode)
+ * @param offsetCacheParams config of information cache ways (required in streaming mode)
*/
@JsonInclude(Include.NON_NULL)
case class EnvConfig(@JsonProperty("spark") sparkParam: SparkParam,
@JsonProperty("persist") persistParams: List[PersistParam],
- @JsonProperty("info.cache") infoCacheParams: List[InfoCacheParam]
+ @JsonProperty("info.cache") offsetCacheParams: List[OffsetCacheParam]
) extends Param {
def getSparkParam: SparkParam = sparkParam
def getPersistParams: Seq[PersistParam] = if (persistParams != null) persistParams else Nil
- def getInfoCacheParams: Seq[InfoCacheParam] = if (infoCacheParams != null) infoCacheParams else Nil
+ def getOffsetCacheParams: Seq[OffsetCacheParam] = if (offsetCacheParams != null) offsetCacheParams else Nil
def validate(): Unit = {
assert((sparkParam != null), "spark param should not be null")
sparkParam.validate
getPersistParams.foreach(_.validate)
- getInfoCacheParams.foreach(_.validate)
+ getOffsetCacheParams.foreach(_.validate)
}
}
@@ -95,14 +95,14 @@ case class PersistParam( @JsonProperty("type") persistType: String,
}
/**
- * info cache param
- * @param cacheType information cache type, e.g.: zookeeper (must)
+ * offset cache param
+ * @param cacheType offset cache type, e.g.: zookeeper (must)
* @param config config of cache way
*/
@JsonInclude(Include.NON_NULL)
-case class InfoCacheParam( @JsonProperty("type") cacheType: String,
- @JsonProperty("config") config: Map[String, Any]
- ) extends Param {
+case class OffsetCacheParam(@JsonProperty("type") cacheType: String,
+ @JsonProperty("config") config: Map[String, Any]
+ ) extends Param {
def getType: String = cacheType
def getConfig: Map[String, Any] = if (config != null) config else Map[String, Any]()
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
index 1d7dc62..926bde1 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/DQContext.scala
@@ -20,7 +20,7 @@ package org.apache.griffin.measure.context
import org.apache.griffin.measure.configuration.enums._
import org.apache.griffin.measure.configuration.params._
-import org.apache.griffin.measure.context.datasource._
+import org.apache.griffin.measure.datasource._
import org.apache.griffin.measure.context.writer._
import org.apache.spark.sql.{Encoders, SQLContext, SparkSession}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
deleted file mode 100644
index 0ca61aa..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSource.scala
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
-import org.apache.griffin.measure.context.datasource.connector.DataConnector
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.spark.sql._
-
-/**
- * data source
- * @param name name of data source
- * @param dsParam param of this data source
- * @param dataConnectors list of data connectors
- * @param streamingCacheClientOpt streaming data cache client option
- */
-case class DataSource(name: String,
- dsParam: DataSourceParam,
- dataConnectors: Seq[DataConnector],
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ) extends Loggable with Serializable {
-
- def init(): Unit = {
- dataConnectors.foreach(_.init)
- }
-
- def loadData(context: DQContext): TimeRange = {
- info(s"load data [${name}]")
- val timestamp = context.contextId.timestamp
- val (dfOpt, timeRange) = data(timestamp)
- dfOpt match {
- case Some(df) => {
- context.runTimeTableRegister.registerTable(name, df)
- }
- case None => {
- warn(s"load data source [${name}] fails")
- }
- }
- timeRange
- }
-
- private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
- val batches = dataConnectors.flatMap { dc =>
- val (dfOpt, timeRange) = dc.data(timestamp)
- dfOpt match {
- case Some(df) => Some((dfOpt, timeRange))
- case _ => None
- }
- }
- val caches = streamingCacheClientOpt match {
- case Some(dsc) => dsc.readData() :: Nil
- case _ => Nil
- }
- val pairs = batches ++ caches
-
- if (pairs.size > 0) {
- pairs.reduce { (a, b) =>
- (unionDfOpts(a._1, b._1), a._2.merge(b._2))
- }
- } else {
- (None, TimeRange.emptyTimeRange)
- }
- }
-
- def updateData(df: DataFrame): Unit = {
- streamingCacheClientOpt.foreach(_.updateData(Some(df)))
- }
-
- def cleanOldData(): Unit = {
- streamingCacheClientOpt.foreach(_.cleanOutTimeData)
- }
-
- def processFinish(): Unit = {
- streamingCacheClientOpt.foreach(_.processFinish)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
deleted file mode 100644
index edd88b6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/DataSourceFactory.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.DataSourceParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClientFactory
-import org.apache.griffin.measure.context.datasource.connector.{DataConnector, DataConnectorFactory}
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-
-import scala.util.Success
-
-object DataSourceFactory extends Loggable {
-
- def getDataSources(sparkSession: SparkSession,
- ssc: StreamingContext,
- dataSources: Seq[DataSourceParam]
- ): Seq[DataSource] = {
- dataSources.zipWithIndex.flatMap { pair =>
- val (param, index) = pair
- getDataSource(sparkSession, ssc, param, index)
- }
- }
-
- private def getDataSource(sparkSession: SparkSession,
- ssc: StreamingContext,
- dataSourceParam: DataSourceParam,
- index: Int
- ): Option[DataSource] = {
- val name = dataSourceParam.getName
- val connectorParams = dataSourceParam.getConnectors
- val tmstCache = TmstCache()
-
- // for streaming data cache
- val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
- sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, tmstCache)
-
- val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
- DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
- tmstCache, streamingCacheClientOpt) match {
- case Success(connector) => Some(connector)
- case _ => None
- }
- }
-
- Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
deleted file mode 100644
index 1c3ca60..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClient.scala
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.{DataSourceCacheable, TmstCache}
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.utils.DataFrameUtil._
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
-import org.apache.spark.sql._
-
-import scala.util.Random
-
-/**
- * data source cache in streaming mode
- * save data frame into hdfs in dump phase
- * read data frame from hdfs in calculate phase
- * with update and clean actions for the cache data
- */
-trait StreamingCacheClient extends DataSourceCacheable with WithFanIn[Long] with Loggable with Serializable {
-
- val sqlContext: SQLContext
- val param: Map[String, Any]
- val dsName: String
- val index: Int
-
- val tmstCache: TmstCache
- protected def fromUntilRangeTmsts(from: Long, until: Long) = tmstCache.fromUntil(from, until)
- protected def clearTmst(t: Long) = tmstCache.remove(t)
- protected def clearTmstsUntil(until: Long) = {
- val outDateTmsts = tmstCache.until(until)
- tmstCache.remove(outDateTmsts)
- }
- protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
- protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
-
- val _FilePath = "file.path"
- val _InfoPath = "info.path"
- val _ReadyTimeInterval = "ready.time.interval"
- val _ReadyTimeDelay = "ready.time.delay"
- val _TimeRange = "time.range"
-
- val rdmStr = Random.alphanumeric.take(10).mkString
- val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
- val defInfoPath = s"${index}"
-
- val filePath: String = param.getString(_FilePath, defFilePath)
- val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
- val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
- val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
- val deltaTimeRange: (Long, Long) = {
- def negative(n: Long): Long = if (n <= 0) n else 0
- param.get(_TimeRange) match {
- case Some(seq: Seq[String]) => {
- val nseq = seq.flatMap(TimeUtil.milliseconds(_))
- val ns = negative(nseq.headOption.getOrElse(0))
- val ne = negative(nseq.tail.headOption.getOrElse(0))
- (ns, ne)
- }
- case _ => (0, 0)
- }
- }
-
- val _ReadOnly = "read.only"
- val readOnly = param.getBoolean(_ReadOnly, false)
-
- val _Updatable = "updatable"
- val updatable = param.getBoolean(_Updatable, false)
-
- val newCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.new")
- val oldCacheLock = InfoCacheInstance.genLock(s"${cacheInfoPath}.old")
-
- val newFilePath = s"${filePath}/new"
- val oldFilePath = s"${filePath}/old"
-
- val defOldCacheIndex = 0L
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
-
- /**
- * save data frame in dump phase
- * @param dfOpt data frame to be saved
- * @param ms timestamp of this data frame
- */
- def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
- if (!readOnly) {
- dfOpt match {
- case Some(df) => {
- // cache df
- df.cache
-
- // cache df
- val cnt = df.count
- info(s"save ${dsName} data count: ${cnt}")
-
- if (cnt > 0) {
- // lock makes it safer when writing new cache data
- val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
- if (newCacheLocked) {
- try {
- val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
- writeDataFrame(dfw, newFilePath)
- } catch {
- case e: Throwable => error(s"save data error: ${e.getMessage}")
- } finally {
- newCacheLock.unlock()
- }
- }
- }
-
- // uncache df
- df.unpersist
- }
- case _ => {
- info(s"no data frame to save")
- }
- }
-
- // submit cache time and ready time
- if (fanIncrement(ms)) {
- info(s"save data [${ms}] finish")
- submitCacheTime(ms)
- submitReadyTime(ms)
- }
-
- }
- }
-
- /**
- * read data frame in calculation phase
- * @return data frame to calculate, with the time range of data
- */
- def readData(): (Option[DataFrame], TimeRange) = {
- // time range: (a, b]
- val timeRange = TimeInfoCache.getTimeRange
- val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
-
- // read partition info
- val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
- info(s"read time range: [${reviseTimeRange._1}]")
- s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
- } else {
- info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
- s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
- }
-
- // new cache data
- val newDfOpt = try {
- val dfr = sqlContext.read
- Some(readDataFrame(dfr, newFilePath).filter(filterStr))
- } catch {
- case e: Throwable => {
- warn(s"read data source cache warn: ${e.getMessage}")
- None
- }
- }
-
- // old cache data
- val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
- val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
- val oldDfPath = s"${oldFilePath}/${idx}"
- try {
- val dfr = sqlContext.read
- Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
- } catch {
- case e: Throwable => {
- warn(s"read old data source cache warn: ${e.getMessage}")
- None
- }
- }
- }
-
- // whole cache data
- val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
-
- // from until tmst range
- val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
- val tmstSet = afterTilRangeTmsts(from, until)
-
- val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
- (cacheDfOpt, retTimeRange)
- }
-
- private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
- func: (Long, Long) => Boolean
- ): Unit = {
- val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
- // delete out time data path
- earlierOrEqPaths.foreach { path =>
- info(s"delete hdfs path: ${path}")
- HdfsUtil.deleteHdfsPath(path)
- }
- }
- private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
- func: (Long, Long) => Boolean
- ): Iterable[String] = {
- val names = HdfsUtil.listSubPathsByType(path, "dir")
- val regex = partitionOpt match {
- case Some(partition) => s"^${partition}=(\\d+)$$".r
- case _ => "^(\\d+)$".r
- }
- names.filter { name =>
- name match {
- case regex(value) => {
- str2Long(value) match {
- case Some(t) => func(t, bound)
- case _ => false
- }
- }
- case _ => false
- }
- }.map(name => s"${path}/${name}")
- }
- private def str2Long(str: String): Option[Long] = {
- try {
- Some(str.toLong)
- } catch {
- case e: Throwable => None
- }
- }
-
- /**
- * clean out-time cached data on hdfs
- */
- def cleanOutTimeData(): Unit = {
- // clean tmst
- val cleanTime = readCleanTime
- cleanTime.foreach(clearTmstsTil(_))
-
- if (!readOnly) {
- // new cache data
- val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
- newCacheCleanTime match {
- case Some(nct) => {
- // clean calculated new cache data
- val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
- if (newCacheLocked) {
- try {
- cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
- (a: Long, b: Long) => (a <= b))
- } catch {
- case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
- } finally {
- newCacheLock.unlock()
- }
- }
- }
- case _ => {
- // do nothing
- }
- }
-
- // old cache data
- val oldCacheCleanTime = if (updatable) readCleanTime else None
- oldCacheCleanTime match {
- case Some(oct) => {
- val oldCacheIndexOpt = readOldCacheIndex
- oldCacheIndexOpt.foreach { idx =>
- val oldDfPath = s"${oldFilePath}/${idx}"
- val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
- if (oldCacheLocked) {
- try {
- // clean calculated old cache data
- cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
- // clean out time old cache data not calculated
-// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
- } catch {
- case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
- } finally {
- oldCacheLock.unlock()
- }
- }
- }
- }
- case _ => {
- // do nothing
- }
- }
- }
- }
-
- /**
- * update old cached data by new data frame
- * @param dfOpt data frame to update old cached data
- */
- def updateData(dfOpt: Option[DataFrame]): Unit = {
- if (!readOnly && updatable) {
- dfOpt match {
- case Some(df) => {
- // old cache lock
- val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
- if (oldCacheLocked) {
- try {
- val oldCacheIndexOpt = readOldCacheIndex
- val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
-
- val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
- val cleanTime = getNextCleanTime
- val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
- val updateDf = df.filter(filterStr)
-
- val prlCount = sqlContext.sparkContext.defaultParallelism
- // repartition
- val repartitionedDf = updateDf.repartition(prlCount)
- val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
- writeDataFrame(dfw, oldDfPath)
-
- submitOldCacheIndex(nextOldCacheIndex)
- } catch {
- case e: Throwable => error(s"update data error: ${e.getMessage}")
- } finally {
- oldCacheLock.unlock()
- }
- }
- }
- case _ => {
- info(s"no data frame to update")
- }
- }
- }
- }
-
- /**
- * each time calculation phase finishes,
- * data source cache needs to submit some cache information
- */
- def processFinish(): Unit = {
- // next last proc time
- val timeRange = TimeInfoCache.getTimeRange
- submitLastProcTime(timeRange._2)
-
- // next clean time
- val nextCleanTime = timeRange._2 + deltaTimeRange._1
- submitCleanTime(nextCleanTime)
- }
-
- // read next clean time
- private def getNextCleanTime(): Long = {
- val timeRange = TimeInfoCache.getTimeRange
- val nextCleanTime = timeRange._2 + deltaTimeRange._1
- nextCleanTime
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
deleted file mode 100644
index fd9d231..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheClientFactory.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.ParamUtil._
-import org.apache.spark.sql.SQLContext
-
-object StreamingCacheClientFactory extends Loggable {
-
- private object DataSourceCacheType {
- val ParquetRegex = "^(?i)parq(uet)?$".r
- val JsonRegex = "^(?i)json$".r
- val OrcRegex = "^(?i)orc$".r
- }
- import DataSourceCacheType._
-
- val _type = "type"
-
- /**
- * create streaming cache client
- * @param sqlContext sqlContext in spark environment
- * @param cacheOpt data source cache config option
- * @param name data source name
- * @param index data source index
- * @param tmstCache the same tmstCache instance inside a data source
- * @return streaming cache client option
- */
- def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
- name: String, index: Int, tmstCache: TmstCache
- ): Option[StreamingCacheClient] = {
- cacheOpt.flatMap { param =>
- try {
- val tp = param.getString(_type, "")
- val dsCache = tp match {
- case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
- case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
- case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
- case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
- }
- Some(dsCache)
- } catch {
- case e: Throwable => {
- error(s"generate data source cache fails")
- None
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
deleted file mode 100644
index 494db3e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheJsonClient.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
- * data source cache in json format
- */
-case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, tmstCache: TmstCache
- ) extends StreamingCacheClient {
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
- info(s"write path: ${path}")
- dfw.json(path)
- }
-
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
- dfr.json(path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
deleted file mode 100644
index 6e0f142..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheOrcClient.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
- * data source cache in orc format
- */
-case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, tmstCache: TmstCache
- ) extends StreamingCacheClient {
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
- info(s"write path: ${path}")
- dfw.orc(path)
- }
-
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
- dfr.orc(path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
deleted file mode 100644
index d99bc58..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/StreamingCacheParquetClient.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql._
-
-/**
- * data source cache in parquet format
- */
-case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
- dsName: String, index: Int, tmstCache: TmstCache
- ) extends StreamingCacheClient {
-
- sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
-
- protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
- info(s"write path: ${path}")
- dfw.parquet(path)
- }
-
- protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
- dfr.parquet(path)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
deleted file mode 100644
index ebd2e55..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/cache/WithFanIn.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.cache
-
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.concurrent.{TrieMap, Map => ConcMap}
-
-/**
- * fan in trait, for multiple input and one output
- * to support multiple parallel data connectors in one data source
- */
-trait WithFanIn[T] {
-
- // total input number
- val totalNum: AtomicInteger = new AtomicInteger(0)
- // concurrent map of fan in count for each key
- val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
-
- def registerFanIn(): Int = {
- totalNum.incrementAndGet()
- }
-
- /**
- * increment for a key, to test if all parallel inputs finished
- * @param key
- * @return
- */
- def fanIncrement(key: T): Boolean = {
- fanInc(key)
- fanInCountMap.get(key) match {
- case Some(n) if (n >= totalNum.get) => {
- fanInCountMap.remove(key)
- true
- }
- case _ => false
- }
- }
-
- private def fanInc(key: T): Unit = {
- fanInCountMap.get(key) match {
- case Some(n) => {
- val suc = fanInCountMap.replace(key, n, n + 1)
- if (!suc) fanInc(key)
- }
- case _ => {
- val oldOpt = fanInCountMap.putIfAbsent(key, 1)
- if (oldOpt.nonEmpty) fanInc(key)
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
deleted file mode 100644
index a4c1995..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnector.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector
-
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType}
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
-import org.apache.griffin.measure.job.builder.DQJobBuilder
-import org.apache.griffin.measure.step.builder.ConstantColumns
-import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.spark.sql.functions._
-
-trait DataConnector extends Loggable with Serializable {
-
- @transient val sparkSession: SparkSession
-
- val dcParam: DataConnectorParam
-
- val id: String = DataConnectorIdGenerator.genId
- protected def thisName(suffix: String): String = s"this_${suffix}"
-
- val tmstCache: TmstCache
- protected def saveTmst(t: Long) = tmstCache.insert(t)
- protected def readTmst(t: Long) = tmstCache.fromUntil(t, t + 1)
-
- def init(): Unit
-
- // get data frame in batch mode
- def data(ms: Long): (Option[DataFrame], TimeRange)
-
- private def createContext(t: Long): DQContext = {
- DQContext(ContextId(t, id), id, Nil, Nil, BatchProcessType)(sparkSession)
- }
-
- def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
- // new context
- val context = createContext(ms)
-
- val timestamp = context.contextId.timestamp
- val suffix = context.contextId.id
- val thisTable = thisName(suffix)
-
- try {
- saveTmst(timestamp) // save timestamp
-
- dfOpt.flatMap { df =>
- val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
-
- // init data
- context.compileTableRegister.registerTable(thisTable)
- context.runTimeTableRegister.registerTable(thisTable, df)
-
- // build job
- val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
-
- // job execute
- preprocJob.execute(context)
-
- // out data
- val outDf = context.sparkSession.table(s"`${thisTable}`")
-
- // add tmst column
- val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
-
- // clean context
- context.clean()
-
- Some(withTmstDf)
- }
-
- } catch {
- case e: Throwable => {
- error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
- None
- }
- }
- }
-}
-
-object DataConnectorIdGenerator {
- private val counter: AtomicLong = new AtomicLong(0L)
- private val head: String = "dc"
-
- def genId: String = {
- s"${head}${increment}"
- }
-
- private def increment: Long = {
- counter.incrementAndGet()
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
deleted file mode 100644
index 4538fbb..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/DataConnectorFactory.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.datasource.connector.batch._
-import org.apache.griffin.measure.context.datasource.connector.streaming._
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.streaming.StreamingContext
-
-import scala.reflect.ClassTag
-import scala.util.Try
-
-object DataConnectorFactory extends Loggable {
-
- val HiveRegex = """^(?i)hive$""".r
- val AvroRegex = """^(?i)avro$""".r
- val TextDirRegex = """^(?i)text-dir$""".r
-
- val KafkaRegex = """^(?i)kafka$""".r
-
- /**
- * create data connector
- * @param sparkSession spark env
- * @param ssc spark streaming env
- * @param dcParam data connector param
- * @param tmstCache same tmst cache in one data source
- * @param streamingCacheClientOpt for streaming cache
- * @return data connector
- */
- def getDataConnector(sparkSession: SparkSession,
- ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ): Try[DataConnector] = {
- val conType = dcParam.getType
- val version = dcParam.getVersion
- Try {
- conType match {
- case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
- case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
- case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
- case KafkaRegex() => {
- getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- }
- case _ => throw new Exception("connector creation error!")
- }
- }
- }
-
- private def getStreamingDataConnector(sparkSession: SparkSession,
- ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ): StreamingDataConnector = {
- if (ssc == null) throw new Exception("streaming context is null!")
- val conType = dcParam.getType
- val version = dcParam.getVersion
- conType match {
- case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- case _ => throw new Exception("streaming connector creation error!")
- }
- }
-
- private def getKafkaDataConnector(sparkSession: SparkSession,
- ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ): KafkaStreamingDataConnector = {
- val KeyType = "key.type"
- val ValueType = "value.type"
- val config = dcParam.config
- val keyType = config.getOrElse(KeyType, "java.lang.String").toString
- val valueType = config.getOrElse(ValueType, "java.lang.String").toString
- (getClassTag(keyType), getClassTag(valueType)) match {
- case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
- KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
- }
- case _ => {
- throw new Exception("not supported type kafka data connector")
- }
- }
- }
-
- private def getClassTag(tp: String): ClassTag[_] = {
- try {
- val clazz = Class.forName(tp)
- ClassTag(clazz)
- } catch {
- case e: Throwable => throw e
- }
- }
-
-// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
-// connectors.flatMap { dc =>
-// dc match {
-// case mdc: T => Some(mdc)
-// case _ => None
-// }
-// }
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
deleted file mode 100644
index a906246..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/AvroBatchDataConnector.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
- * batch data connector for avro file
- */
-case class AvroBatchDataConnector(@transient sparkSession: SparkSession,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache
- ) extends BatchDataConnector {
-
- val config = dcParam.getConfig
-
- val FilePath = "file.path"
- val FileName = "file.name"
-
- val filePath = config.getString(FilePath, "")
- val fileName = config.getString(FileName, "")
-
- val concreteFileFullPath = if (pathPrefix) s"${filePath}${fileName}" else fileName
-
- private def pathPrefix(): Boolean = {
- filePath.nonEmpty
- }
-
- private def fileExist(): Boolean = {
- HdfsUtil.existPath(concreteFileFullPath)
- }
-
- def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val df = sparkSession.read.format("com.databricks.spark.avro").load(concreteFileFullPath)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable => {
- error(s"load avro file ${concreteFileFullPath} fails")
- None
- }
- }
- val tmsts = readTmst(ms)
- (dfOpt, TimeRange(ms, tmsts))
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
deleted file mode 100644
index 8f32687..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/BatchDataConnector.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.context.datasource.connector.DataConnector
-
-trait BatchDataConnector extends DataConnector {
-
- def init(): Unit = {}
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
deleted file mode 100644
index 331f469..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/HiveBatchDataConnector.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
- * batch data connector for hive table
- */
-case class HiveBatchDataConnector(@transient sparkSession: SparkSession,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache
- ) extends BatchDataConnector {
-
- val config = dcParam.getConfig
-
- val Database = "database"
- val TableName = "table.name"
- val Where = "where"
-
- val database = config.getString(Database, "default")
- val tableName = config.getString(TableName, "")
- val whereString = config.getString(Where, "")
-
- val concreteTableName = s"${database}.${tableName}"
- val wheres = whereString.split(",").map(_.trim).filter(_.nonEmpty)
-
- def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dtSql = dataSql
- info(dtSql)
- val df = sparkSession.sql(dtSql)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } catch {
- case e: Throwable => {
- error(s"load hive table ${concreteTableName} fails: ${e.getMessage}")
- None
- }
- }
- val tmsts = readTmst(ms)
- (dfOpt, TimeRange(ms, tmsts))
- }
-
-
- private def tableExistsSql(): String = {
-// s"SHOW TABLES LIKE '${concreteTableName}'" // this is hive sql, but not work for spark sql
- s"tableName LIKE '${tableName}'"
- }
-
- private def metaDataSql(): String = {
- s"DESCRIBE ${concreteTableName}"
- }
-
- private def dataSql(): String = {
- val tableClause = s"SELECT * FROM ${concreteTableName}"
- if (wheres.length > 0) {
- val clauses = wheres.map { w =>
- s"${tableClause} WHERE ${w}"
- }
- clauses.mkString(" UNION ALL ")
- } else tableClause
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
deleted file mode 100644
index bc76f9d..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/batch/TextDirBatchDataConnector.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.batch
-
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.griffin.measure.utils.HdfsUtil
-import org.apache.spark.sql.{DataFrame, SparkSession}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
- * batch data connector for directory with text format data in the nth depth sub-directories
- */
-case class TextDirBatchDataConnector(@transient sparkSession: SparkSession,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache
- ) extends BatchDataConnector {
-
- val config = dcParam.getConfig
-
- val DirPath = "dir.path"
- val DataDirDepth = "data.dir.depth"
- val SuccessFile = "success.file"
- val DoneFile = "done.file"
-
- val dirPath = config.getString(DirPath, "")
- val dataDirDepth = config.getInt(DataDirDepth, 0)
- val successFile = config.getString(SuccessFile, "_SUCCESS")
- val doneFile = config.getString(DoneFile, "_DONE")
-
- val ignoreFilePrefix = "_"
-
- private def dirExist(): Boolean = {
- HdfsUtil.existPath(dirPath)
- }
-
- def data(ms: Long): (Option[DataFrame], TimeRange) = {
- val dfOpt = try {
- val dataDirs = listSubDirs(dirPath :: Nil, dataDirDepth, readable)
- // touch done file for read dirs
- dataDirs.foreach(dir => touchDone(dir))
-
- val validDataDirs = dataDirs.filter(dir => !emptyDir(dir))
-
- if (validDataDirs.nonEmpty) {
- val df = sparkSession.read.text(validDataDirs: _*)
- val dfOpt = Some(df)
- val preDfOpt = preProcess(dfOpt, ms)
- preDfOpt
- } else {
- None
- }
- } catch {
- case e: Throwable => {
- error(s"load text dir ${dirPath} fails: ${e.getMessage}")
- None
- }
- }
- val tmsts = readTmst(ms)
- (dfOpt, TimeRange(ms, tmsts))
- }
-
- private def listSubDirs(paths: Seq[String], depth: Int, filteFunc: (String) => Boolean): Seq[String] = {
- val subDirs = paths.flatMap { path => HdfsUtil.listSubPathsByType(path, "dir", true) }
- if (depth <= 0) {
- subDirs.filter(filteFunc)
- } else {
- listSubDirs(subDirs, depth - 1, filteFunc)
- }
- }
-
- private def readable(dir: String): Boolean = isSuccess(dir) && !isDone(dir)
- private def isDone(dir: String): Boolean = HdfsUtil.existFileInDir(dir, doneFile)
- private def isSuccess(dir: String): Boolean = HdfsUtil.existFileInDir(dir, successFile)
-
- private def touchDone(dir: String): Unit = HdfsUtil.createEmptyFile(HdfsUtil.getHdfsFilePath(dir, doneFile))
-
- private def emptyDir(dir: String): Boolean = {
- HdfsUtil.listSubPathsByType(dir, "file").filter(!_.startsWith(ignoreFilePrefix)).size == 0
- }
-
-// def metaData(): Try[Iterable[(String, String)]] = {
-// Try {
-// val st = sqlContext.read.format("com.databricks.spark.avro").load(concreteFileFullPath).schema
-// st.fields.map(f => (f.name, f.dataType.typeName))
-// }
-// }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
deleted file mode 100644
index 0f30b7f..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingDataConnector.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.streaming
-
-import kafka.serializer.Decoder
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.{Failure, Success, Try}
-import org.apache.griffin.measure.utils.ParamUtil._
-
-/**
- * streaming data connector for kafka
- */
-trait KafkaStreamingDataConnector extends StreamingDataConnector {
-
- type KD <: Decoder[K]
- type VD <: Decoder[V]
- type OUT = (K, V)
-
- val config = dcParam.getConfig
-
- val KafkaConfig = "kafka.config"
- val Topics = "topics"
-
- val kafkaConfig = config.getAnyRef(KafkaConfig, Map[String, String]())
- val topics = config.getString(Topics, "")
-
- def init(): Unit = {
- // register fan in
- streamingCacheClientOpt.foreach(_.registerFanIn)
-
- val ds = stream match {
- case Success(dstream) => dstream
- case Failure(ex) => throw ex
- }
- ds.foreachRDD((rdd, time) => {
- val ms = time.milliseconds
- val saveDfOpt = try {
- // coalesce partition number
- val prlCount = rdd.sparkContext.defaultParallelism
- val ptnCount = rdd.getNumPartitions
- val repartitionedRdd = if (prlCount < ptnCount) {
- rdd.coalesce(prlCount)
- } else rdd
-
- val dfOpt = transform(repartitionedRdd)
-
- // pre-process
- preProcess(dfOpt, ms)
- } catch {
- case e: Throwable => {
- error(s"streaming data connector error: ${e.getMessage}")
- None
- }
- }
-
- // save data frame
- streamingCacheClientOpt.foreach(_.saveData(saveDfOpt, ms))
- })
- }
-
- def stream(): Try[InputDStream[OUT]] = Try {
- val topicSet = topics.split(",").toSet
- createDStream(topicSet)
- }
-
- protected def createDStream(topicSet: Set[String]): InputDStream[OUT]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
deleted file mode 100644
index 3083ca6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/KafkaStreamingStringDataConnector.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.streaming
-
-import kafka.serializer.StringDecoder
-import org.apache.griffin.measure.configuration.params.DataConnectorParam
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.datasource.info.TmstCache
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{StringType, StructField, StructType}
-import org.apache.spark.sql.{Row, _}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.dstream.InputDStream
-import org.apache.spark.streaming.kafka.KafkaUtils
-
-/**
- * streaming data connector for kafka with string format key and value
- */
-case class KafkaStreamingStringDataConnector(@transient sparkSession: SparkSession,
- @transient ssc: StreamingContext,
- dcParam: DataConnectorParam,
- tmstCache: TmstCache,
- streamingCacheClientOpt: Option[StreamingCacheClient]
- ) extends KafkaStreamingDataConnector {
-
- type K = String
- type KD = StringDecoder
- type V = String
- type VD = StringDecoder
-
- val valueColName = "value"
- val schema = StructType(Array(
- StructField(valueColName, StringType)
- ))
-
- def createDStream(topicSet: Set[String]): InputDStream[OUT] = {
- KafkaUtils.createDirectStream[K, V, KD, VD](ssc, kafkaConfig, topicSet)
- }
-
- def transform(rdd: RDD[OUT]): Option[DataFrame] = {
- if (rdd.isEmpty) None else {
- try {
- val rowRdd = rdd.map(d => Row(d._2))
- val df = sparkSession.createDataFrame(rowRdd, schema)
- Some(df)
- } catch {
- case e: Throwable => {
- error(s"streaming data transform fails")
- None
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
deleted file mode 100644
index 737bc21..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/connector/streaming/StreamingDataConnector.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.connector.streaming
-
-import org.apache.griffin.measure.context.TimeRange
-import org.apache.griffin.measure.context.datasource.cache.StreamingCacheClient
-import org.apache.griffin.measure.context.datasource.connector.DataConnector
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.streaming.dstream.InputDStream
-
-import scala.util.Try
-
-trait StreamingDataConnector extends DataConnector {
-
- type K
- type V
- type OUT
-
- protected def stream(): Try[InputDStream[OUT]]
-
- // transform rdd to dataframe
- def transform(rdd: RDD[OUT]): Option[DataFrame]
-
- // streaming data connector cannot directly read data frame
- def data(ms: Long): (Option[DataFrame], TimeRange) = (None, TimeRange.emptyTimeRange)
-
- val streamingCacheClientOpt: Option[StreamingCacheClient]
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
deleted file mode 100644
index f721a1e..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/DataSourceCacheable.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.info
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.info.{InfoCacheInstance, TimeInfoCache}
-
-/**
- * timestamp info of data source cache
- */
-trait DataSourceCacheable extends Loggable with Serializable {
-
- val cacheInfoPath: String
- val readyTimeInterval: Long
- val readyTimeDelay: Long
-
- def selfCacheInfoPath = s"${TimeInfoCache.infoPath}/${cacheInfoPath}"
-
- def selfCacheTime = TimeInfoCache.cacheTime(selfCacheInfoPath)
- def selfLastProcTime = TimeInfoCache.lastProcTime(selfCacheInfoPath)
- def selfReadyTime = TimeInfoCache.readyTime(selfCacheInfoPath)
- def selfCleanTime = TimeInfoCache.cleanTime(selfCacheInfoPath)
- def selfOldCacheIndex = TimeInfoCache.oldCacheIndex(selfCacheInfoPath)
-
- protected def submitCacheTime(ms: Long): Unit = {
- val map = Map[String, String]((selfCacheTime -> ms.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- protected def submitReadyTime(ms: Long): Unit = {
- val curReadyTime = ms - readyTimeDelay
- if (curReadyTime % readyTimeInterval == 0) {
- val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
- InfoCacheInstance.cacheInfo(map)
- }
- }
-
- protected def submitLastProcTime(ms: Long): Unit = {
- val map = Map[String, String]((selfLastProcTime -> ms.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime)
-
- protected def submitCleanTime(ms: Long): Unit = {
- val cleanTime = genCleanTime(ms)
- val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- protected def genCleanTime(ms: Long): Long = ms
-
- protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
-
- protected def submitOldCacheIndex(index: Long): Unit = {
- val map = Map[String, String]((selfOldCacheIndex -> index.toString))
- InfoCacheInstance.cacheInfo(map)
- }
-
- def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
-
- private def readSelfInfo(key: String): Option[Long] = {
- InfoCacheInstance.readInfo(key :: Nil).get(key).flatMap { v =>
- try {
- Some(v.toLong)
- } catch {
- case _ => None
- }
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
deleted file mode 100644
index 4b1c410..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/datasource/info/TmstCache.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.datasource.info
-
-import org.apache.griffin.measure.Loggable
-
-import scala.collection.mutable.{SortedSet => MutableSortedSet}
-
-/**
- * tmst cache, CRUD of timestamps
- */
-case class TmstCache() extends Loggable {
-
- private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
-
- //-- insert tmst into tmst group --
- def insert(tmst: Long) = tmstGroup += tmst
- def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
-
- //-- remove tmst from tmst group --
- def remove(tmst: Long) = tmstGroup -= tmst
- def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
-
- //-- get subset of tmst group --
- def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
- def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
- def until(until: Long) = tmstGroup.until(until).toSet
- def from(from: Long) = tmstGroup.from(from).toSet
- def all = tmstGroup.toSet
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
deleted file mode 100644
index e1d498b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCache.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.Loggable
-import org.apache.griffin.measure.context.streaming.lock.CacheLock
-
-trait InfoCache extends Loggable with Serializable {
-
- def init(): Unit
- def available(): Boolean
- def close(): Unit
-
- def cacheInfo(info: Map[String, String]): Boolean
- def readInfo(keys: Iterable[String]): Map[String, String]
- def deleteInfo(keys: Iterable[String]): Unit
- def clearInfo(): Unit
-
- def listKeys(path: String): List[String]
-
- def genLock(s: String): CacheLock
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
deleted file mode 100644
index 28ade3b..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheFactory.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.configuration.params.InfoCacheParam
-
-import scala.util.{Success, Try}
-
-case class InfoCacheFactory(infoCacheParams: Iterable[InfoCacheParam], metricName: String) extends Serializable {
-
- val ZK_REGEX = """^(?i)zk|zookeeper$""".r
-
- def getInfoCache(infoCacheParam: InfoCacheParam): Option[InfoCache] = {
- val config = infoCacheParam.getConfig
- val infoCacheTry = infoCacheParam.getType match {
- case ZK_REGEX() => Try(ZKInfoCache(config, metricName))
- case _ => throw new Exception("not supported info cache type")
- }
- infoCacheTry match {
- case Success(infoCache) => Some(infoCache)
- case _ => None
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
deleted file mode 100644
index 198f157..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/InfoCacheInstance.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.configuration.params.InfoCacheParam
-import org.apache.griffin.measure.context.streaming.lock.{CacheLock, MultiCacheLock}
-
-object InfoCacheInstance extends InfoCache {
- var infoCaches: List[InfoCache] = Nil
-
- def initInstance(infoCacheParams: Iterable[InfoCacheParam], metricName: String) = {
- val fac = InfoCacheFactory(infoCacheParams, metricName)
- infoCaches = infoCacheParams.flatMap(param => fac.getInfoCache(param)).toList
- }
-
- def init(): Unit = infoCaches.foreach(_.init)
- def available(): Boolean = infoCaches.foldLeft(false)(_ || _.available)
- def close(): Unit = infoCaches.foreach(_.close)
-
- def cacheInfo(info: Map[String, String]): Boolean = {
- infoCaches.foldLeft(false) { (res, infoCache) => res || infoCache.cacheInfo(info) }
- }
- def readInfo(keys: Iterable[String]): Map[String, String] = {
- val maps = infoCaches.map(_.readInfo(keys)).reverse
- maps.fold(Map[String, String]())(_ ++ _)
- }
- def deleteInfo(keys: Iterable[String]): Unit = infoCaches.foreach(_.deleteInfo(keys))
- def clearInfo(): Unit = infoCaches.foreach(_.clearInfo)
-
- def listKeys(path: String): List[String] = {
- infoCaches.foldLeft(Nil: List[String]) { (res, infoCache) =>
- if (res.size > 0) res else infoCache.listKeys(path)
- }
- }
-
- def genLock(s: String): CacheLock = MultiCacheLock(infoCaches.map(_.genLock(s)))
-}
[2/3] incubator-griffin git commit: refactor context and data source
Posted by gu...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
deleted file mode 100644
index b1e6764..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/TimeInfoCache.scala
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.griffin.measure.Loggable
-
-object TimeInfoCache extends Loggable with Serializable {
-
- private val CacheTime = "cache.time"
- private val LastProcTime = "last.proc.time"
- private val ReadyTime = "ready.time"
- private val CleanTime = "clean.time"
- private val OldCacheIndex = "old.cache.index"
-
- def cacheTime(path: String): String = s"${path}/${CacheTime}"
- def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
- def readyTime(path: String): String = s"${path}/${ReadyTime}"
- def cleanTime(path: String): String = s"${path}/${CleanTime}"
- def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
-
- val infoPath = "info"
-
- val finalCacheInfoPath = "info.final"
- val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
- val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
- val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
-
- def startTimeInfoCache(): Unit = {
- genFinalReadyTime
- }
-
- def getTimeRange(): (Long, Long) = {
- readTimeRange
- }
-
- def getCleanTime(): Long = {
- readCleanTime
- }
-
- def endTimeInfoCache: Unit = {
- genFinalLastProcTime
- genFinalCleanTime
- }
-
- private def genFinalReadyTime(): Unit = {
- val subPath = InfoCacheInstance.listKeys(infoPath)
- val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
- val result = InfoCacheInstance.readInfo(keys)
- val times = keys.flatMap { k =>
- getLongOpt(result, k)
- }
- if (times.nonEmpty) {
- val time = times.min
- val map = Map[String, String]((finalReadyTime -> time.toString))
- InfoCacheInstance.cacheInfo(map)
- }
- }
-
- private def genFinalLastProcTime(): Unit = {
- val subPath = InfoCacheInstance.listKeys(infoPath)
- val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
- val result = InfoCacheInstance.readInfo(keys)
- val times = keys.flatMap { k =>
- getLongOpt(result, k)
- }
- if (times.nonEmpty) {
- val time = times.min
- val map = Map[String, String]((finalLastProcTime -> time.toString))
- InfoCacheInstance.cacheInfo(map)
- }
- }
-
- private def genFinalCleanTime(): Unit = {
- val subPath = InfoCacheInstance.listKeys(infoPath)
- val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
- val result = InfoCacheInstance.readInfo(keys)
- val times = keys.flatMap { k =>
- getLongOpt(result, k)
- }
- if (times.nonEmpty) {
- val time = times.min
- val map = Map[String, String]((finalCleanTime -> time.toString))
- InfoCacheInstance.cacheInfo(map)
- }
- }
-
- private def readTimeRange(): (Long, Long) = {
- val map = InfoCacheInstance.readInfo(List(finalLastProcTime, finalReadyTime))
- val lastProcTime = getLong(map, finalLastProcTime)
- val curReadyTime = getLong(map, finalReadyTime)
- (lastProcTime, curReadyTime)
- }
-
- private def readCleanTime(): Long = {
- val map = InfoCacheInstance.readInfo(List(finalCleanTime))
- val cleanTime = getLong(map, finalCleanTime)
- cleanTime
- }
-
- private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
- try {
- map.get(key).map(_.toLong)
- } catch {
- case e: Throwable => None
- }
- }
- private def getLong(map: Map[String, String], key: String) = {
- getLongOpt(map, key).getOrElse(-1L)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
deleted file mode 100644
index 42166ae..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/info/ZKInfoCache.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.info
-
-import org.apache.curator.framework.imps.CuratorFrameworkState
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
-import org.apache.curator.retry.ExponentialBackoffRetry
-import org.apache.curator.utils.ZKPaths
-import org.apache.griffin.measure.context.streaming.lock.ZKCacheLock
-import org.apache.zookeeper.CreateMode
-
-import scala.collection.JavaConverters._
-
-/**
- * leverage zookeeper for info cache
- * @param config
- * @param metricName
- */
-case class ZKInfoCache(config: Map[String, Any], metricName: String) extends InfoCache {
-
- val Hosts = "hosts"
- val Namespace = "namespace"
- val Mode = "mode"
- val InitClear = "init.clear"
- val CloseClear = "close.clear"
- val LockPath = "lock.path"
-
- val PersistRegex = """^(?i)persist$""".r
- val EphemeralRegex = """^(?i)ephemeral$""".r
-
- final val separator = ZKPaths.PATH_SEPARATOR
-
- val hosts = config.getOrElse(Hosts, "").toString
- val namespace = config.getOrElse(Namespace, "").toString
- val mode: CreateMode = config.get(Mode) match {
- case Some(s: String) => s match {
- case PersistRegex() => CreateMode.PERSISTENT
- case EphemeralRegex() => CreateMode.EPHEMERAL
- case _ => CreateMode.PERSISTENT
- }
- case _ => CreateMode.PERSISTENT
- }
- val initClear = config.get(InitClear) match {
- case Some(b: Boolean) => b
- case _ => true
- }
- val closeClear = config.get(CloseClear) match {
- case Some(b: Boolean) => b
- case _ => false
- }
- val lockPath = config.getOrElse(LockPath, "lock").toString
-
- private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
- private val builder = CuratorFrameworkFactory.builder()
- .connectString(hosts)
- .retryPolicy(new ExponentialBackoffRetry(1000, 3))
- .namespace(cacheNamespace)
- private val client: CuratorFramework = builder.build
-
- def init(): Unit = {
- client.start()
- info("start zk info cache")
- client.usingNamespace(cacheNamespace)
- info(s"init with namespace: ${cacheNamespace}")
- deleteInfo(lockPath :: Nil)
- if (initClear) {
- clearInfo
- }
- }
-
- def available(): Boolean = {
- client.getState match {
- case CuratorFrameworkState.STARTED => true
- case _ => false
- }
- }
-
- def close(): Unit = {
- if (closeClear) {
- clearInfo
- }
- info("close zk info cache")
- client.close()
- }
-
- def cacheInfo(info: Map[String, String]): Boolean = {
- info.foldLeft(true) { (rs, pair) =>
- val (k, v) = pair
- createOrUpdate(path(k), v) && rs
- }
- }
-
- def readInfo(keys: Iterable[String]): Map[String, String] = {
- keys.flatMap { key =>
- read(path(key)) match {
- case Some(v) => Some((key, v))
- case _ => None
- }
- }.toMap
- }
-
- def deleteInfo(keys: Iterable[String]): Unit = {
- keys.foreach { key => delete(path(key)) }
- }
-
- def clearInfo(): Unit = {
-// delete("/")
- deleteInfo(TimeInfoCache.finalCacheInfoPath :: Nil)
- deleteInfo(TimeInfoCache.infoPath :: Nil)
- info("clear info")
- }
-
- def listKeys(p: String): List[String] = {
- children(path(p))
- }
-
- def genLock(s: String): ZKCacheLock = {
- val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
- ZKCacheLock(new InterProcessMutex(client, lpt))
- }
-
- private def path(k: String): String = {
- if (k.startsWith(separator)) k else separator + k
- }
-
- private def children(path: String): List[String] = {
- try {
- client.getChildren().forPath(path).asScala.toList
- } catch {
- case e: Throwable => {
- warn(s"list ${path} warn: ${e.getMessage}")
- Nil
- }
- }
- }
-
- private def createOrUpdate(path: String, content: String): Boolean = {
- if (checkExists(path)) {
- update(path, content)
- } else {
- create(path, content)
- }
- }
-
- private def create(path: String, content: String): Boolean = {
- try {
- client.create().creatingParentsIfNeeded().withMode(mode)
- .forPath(path, content.getBytes("utf-8"))
- true
- } catch {
- case e: Throwable => {
- error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
- false
- }
- }
- }
-
- private def update(path: String, content: String): Boolean = {
- try {
- client.setData().forPath(path, content.getBytes("utf-8"))
- true
- } catch {
- case e: Throwable => {
- error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
- false
- }
- }
- }
-
- private def read(path: String): Option[String] = {
- try {
- Some(new String(client.getData().forPath(path), "utf-8"))
- } catch {
- case e: Throwable => {
- warn(s"read ${path} warn: ${e.getMessage}")
- None
- }
- }
- }
-
- private def delete(path: String): Unit = {
- try {
- client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
- } catch {
- case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
- }
- }
-
- private def checkExists(path: String): Boolean = {
- try {
- client.checkExists().forPath(path) != null
- } catch {
- case e: Throwable => {
- warn(s"check exists ${path} warn: ${e.getMessage}")
- false
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
new file mode 100644
index 0000000..5ae4c75
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockInZK.scala
@@ -0,0 +1,53 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+
+case class CacheLockInZK(@transient mutex: InterProcessMutex) extends CacheLock {
+
+ def lock(outtime: Long, unit: TimeUnit): Boolean = {
+ try {
+ if (outtime >= 0) {
+ mutex.acquire(outtime, unit)
+ } else {
+ mutex.acquire(-1, null)
+ }
+ } catch {
+ case e: Throwable => {
+ error(s"lock error: ${e.getMessage}")
+ false
+ }
+ }
+
+ }
+
+ def unlock(): Unit = {
+ try {
+ if (mutex.isAcquiredInThisProcess) mutex.release
+ } catch {
+ case e: Throwable => {
+ error(s"unlock error: ${e.getMessage}")
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
new file mode 100644
index 0000000..6f033bd
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/CacheLockSeq.scala
@@ -0,0 +1,33 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.lock
+
+import java.util.concurrent.TimeUnit
+
+case class CacheLockSeq(cacheLocks: Seq[CacheLock]) extends CacheLock {
+
+ def lock(outtime: Long, unit: TimeUnit): Boolean = {
+ cacheLocks.headOption.map(_.lock(outtime, unit)).getOrElse(true)
+ }
+
+ def unlock(): Unit = {
+ cacheLocks.headOption.foreach(_.unlock)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
deleted file mode 100644
index 6eea0b6..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/MultiCacheLock.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-case class MultiCacheLock(cacheLocks: List[CacheLock]) extends CacheLock {
-
- def lock(outtime: Long, unit: TimeUnit): Boolean = {
- cacheLocks.headOption match {
- case Some(cl) => cl.lock(outtime, unit)
- case None => true
- }
- }
-
- def unlock(): Unit = {
- cacheLocks.headOption match {
- case Some(cl) => cl.unlock
- case None => {}
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
deleted file mode 100644
index 2c9b717..0000000
--- a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/lock/ZKCacheLock.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.griffin.measure.context.streaming.lock
-
-import java.util.concurrent.TimeUnit
-
-import org.apache.curator.framework.recipes.locks.InterProcessMutex
-
-case class ZKCacheLock(@transient mutex: InterProcessMutex) extends CacheLock {
-
- def lock(outtime: Long, unit: TimeUnit): Boolean = {
- try {
- if (outtime >= 0) {
- mutex.acquire(outtime, unit)
- } else {
- mutex.acquire(-1, null)
- }
- } catch {
- case e: Throwable => {
- error(s"lock error: ${e.getMessage}")
- false
- }
- }
-
- }
-
- def unlock(): Unit = {
- try {
- if (mutex.isAcquiredInThisProcess) mutex.release
- } catch {
- case e: Throwable => {
- error(s"unlock error: ${e.getMessage}")
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
new file mode 100644
index 0000000..fc78eda
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCache.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.lock.CacheLock
+
+trait OffsetCache extends Loggable with Serializable {
+
+ def init(): Unit
+ def available(): Boolean
+ def close(): Unit
+
+ def cache(kvs: Map[String, String]): Unit
+ def read(keys: Iterable[String]): Map[String, String]
+ def delete(keys: Iterable[String]): Unit
+ def clear(): Unit
+
+ def listKeys(path: String): List[String]
+
+ def genLock(s: String): CacheLock
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
new file mode 100644
index 0000000..416ff31
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheClient.scala
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.griffin.measure.configuration.params.OffsetCacheParam
+import org.apache.griffin.measure.context.streaming.lock.{CacheLock, CacheLockSeq}
+
+object OffsetCacheClient extends OffsetCache with OffsetOps {
+ var offsetCaches: Seq[OffsetCache] = Nil
+
+ def initClient(offsetCacheParams: Iterable[OffsetCacheParam], metricName: String) = {
+ val fac = OffsetCacheFactory(offsetCacheParams, metricName)
+ offsetCaches = offsetCacheParams.flatMap(param => fac.getOffsetCache(param)).toList
+ }
+
+ def init(): Unit = offsetCaches.foreach(_.init)
+ def available(): Boolean = offsetCaches.foldLeft(false)(_ || _.available)
+ def close(): Unit = offsetCaches.foreach(_.close)
+
+ def cache(kvs: Map[String, String]): Unit = {
+ offsetCaches.foreach(_.cache(kvs))
+ }
+ def read(keys: Iterable[String]): Map[String, String] = {
+ val maps = offsetCaches.map(_.read(keys)).reverse
+ maps.fold(Map[String, String]())(_ ++ _)
+ }
+ def delete(keys: Iterable[String]): Unit = offsetCaches.foreach(_.delete(keys))
+ def clear(): Unit = offsetCaches.foreach(_.clear)
+
+ def listKeys(path: String): List[String] = {
+ offsetCaches.foldLeft(Nil: List[String]) { (res, offsetCache) =>
+ if (res.size > 0) res else offsetCache.listKeys(path)
+ }
+ }
+
+ def genLock(s: String): CacheLock = CacheLockSeq(offsetCaches.map(_.genLock(s)))
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
new file mode 100644
index 0000000..07bca27
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheFactory.scala
@@ -0,0 +1,39 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.griffin.measure.configuration.params.OffsetCacheParam
+
+import scala.util.{Success, Try}
+
+case class OffsetCacheFactory(offsetCacheParams: Iterable[OffsetCacheParam], metricName: String
+ ) extends Serializable {
+
+ val ZK_REGEX = """^(?i)zk|zookeeper$""".r
+
+ def getOffsetCache(offsetCacheParam: OffsetCacheParam): Option[OffsetCache] = {
+ val config = offsetCacheParam.getConfig
+ val offsetCacheTry = offsetCacheParam.getType match {
+ case ZK_REGEX() => Try(OffsetCacheInZK(config, metricName))
+ case _ => throw new Exception("not supported info cache type")
+ }
+ offsetCacheTry.toOption
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
new file mode 100644
index 0000000..9e020c2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetCacheInZK.scala
@@ -0,0 +1,214 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+import org.apache.curator.framework.imps.CuratorFrameworkState
+import org.apache.curator.framework.recipes.locks.InterProcessMutex
+import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory}
+import org.apache.curator.retry.ExponentialBackoffRetry
+import org.apache.curator.utils.ZKPaths
+import org.apache.griffin.measure.context.streaming.lock.CacheLockInZK
+import org.apache.zookeeper.CreateMode
+
+import scala.collection.JavaConverters._
+
+/**
+ * leverage zookeeper for info cache
+ * @param config
+ * @param metricName
+ */
+case class OffsetCacheInZK(config: Map[String, Any], metricName: String) extends OffsetCache with OffsetOps {
+
+ val Hosts = "hosts"
+ val Namespace = "namespace"
+ val Mode = "mode"
+ val InitClear = "init.clear"
+ val CloseClear = "close.clear"
+ val LockPath = "lock.path"
+
+ val PersistRegex = """^(?i)persist$""".r
+ val EphemeralRegex = """^(?i)ephemeral$""".r
+
+ final val separator = ZKPaths.PATH_SEPARATOR
+
+ val hosts = config.getOrElse(Hosts, "").toString
+ val namespace = config.getOrElse(Namespace, "").toString
+ val mode: CreateMode = config.get(Mode) match {
+ case Some(s: String) => s match {
+ case PersistRegex() => CreateMode.PERSISTENT
+ case EphemeralRegex() => CreateMode.EPHEMERAL
+ case _ => CreateMode.PERSISTENT
+ }
+ case _ => CreateMode.PERSISTENT
+ }
+ val initClear = config.get(InitClear) match {
+ case Some(b: Boolean) => b
+ case _ => true
+ }
+ val closeClear = config.get(CloseClear) match {
+ case Some(b: Boolean) => b
+ case _ => false
+ }
+ val lockPath = config.getOrElse(LockPath, "lock").toString
+
+ private val cacheNamespace: String = if (namespace.isEmpty) metricName else namespace + separator + metricName
+ private val builder = CuratorFrameworkFactory.builder()
+ .connectString(hosts)
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+ .namespace(cacheNamespace)
+ private val client: CuratorFramework = builder.build
+
+ def init(): Unit = {
+ client.start()
+ info("start zk info cache")
+ client.usingNamespace(cacheNamespace)
+ info(s"init with namespace: ${cacheNamespace}")
+ delete(lockPath :: Nil)
+ if (initClear) {
+ clear
+ }
+ }
+
+ def available(): Boolean = {
+ client.getState match {
+ case CuratorFrameworkState.STARTED => true
+ case _ => false
+ }
+ }
+
+ def close(): Unit = {
+ if (closeClear) {
+ clear
+ }
+ info("close zk info cache")
+ client.close()
+ }
+
+ def cache(kvs: Map[String, String]): Unit = {
+ kvs.foreach(kv => createOrUpdate(path(kv._1), kv._2))
+ }
+
+ def read(keys: Iterable[String]): Map[String, String] = {
+ keys.flatMap { key =>
+ read(path(key)) match {
+ case Some(v) => Some((key, v))
+ case _ => None
+ }
+ }.toMap
+ }
+
+ def delete(keys: Iterable[String]): Unit = {
+ keys.foreach { key => delete(path(key)) }
+ }
+
+ def clear(): Unit = {
+// delete("/")
+ delete(finalCacheInfoPath :: Nil)
+ delete(infoPath :: Nil)
+ info("clear info")
+ }
+
+ def listKeys(p: String): List[String] = {
+ children(path(p))
+ }
+
+ def genLock(s: String): CacheLockInZK = {
+ val lpt = if (s.isEmpty) path(lockPath) else path(lockPath) + separator + s
+ CacheLockInZK(new InterProcessMutex(client, lpt))
+ }
+
+ private def path(k: String): String = {
+ if (k.startsWith(separator)) k else separator + k
+ }
+
+ private def children(path: String): List[String] = {
+ try {
+ client.getChildren().forPath(path).asScala.toList
+ } catch {
+ case e: Throwable => {
+ warn(s"list ${path} warn: ${e.getMessage}")
+ Nil
+ }
+ }
+ }
+
+ private def createOrUpdate(path: String, content: String): Boolean = {
+ if (checkExists(path)) {
+ update(path, content)
+ } else {
+ create(path, content)
+ }
+ }
+
+ private def create(path: String, content: String): Boolean = {
+ try {
+ client.create().creatingParentsIfNeeded().withMode(mode)
+ .forPath(path, content.getBytes("utf-8"))
+ true
+ } catch {
+ case e: Throwable => {
+ error(s"create ( ${path} -> ${content} ) error: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+ private def update(path: String, content: String): Boolean = {
+ try {
+ client.setData().forPath(path, content.getBytes("utf-8"))
+ true
+ } catch {
+ case e: Throwable => {
+ error(s"update ( ${path} -> ${content} ) error: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+ private def read(path: String): Option[String] = {
+ try {
+ Some(new String(client.getData().forPath(path), "utf-8"))
+ } catch {
+ case e: Throwable => {
+ warn(s"read ${path} warn: ${e.getMessage}")
+ None
+ }
+ }
+ }
+
+ private def delete(path: String): Unit = {
+ try {
+ client.delete().guaranteed().deletingChildrenIfNeeded().forPath(path)
+ } catch {
+ case e: Throwable => error(s"delete ${path} error: ${e.getMessage}")
+ }
+ }
+
+ private def checkExists(path: String): Boolean = {
+ try {
+ client.checkExists().forPath(path) != null
+ } catch {
+ case e: Throwable => {
+ warn(s"check exists ${path} warn: ${e.getMessage}")
+ false
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
new file mode 100644
index 0000000..c8763ec
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/context/streaming/offset/OffsetOps.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.context.streaming.offset
+
+trait OffsetOps extends Serializable { this: OffsetCache =>
+
+ val CacheTime = "cache.time"
+ val LastProcTime = "last.proc.time"
+ val ReadyTime = "ready.time"
+ val CleanTime = "clean.time"
+ val OldCacheIndex = "old.cache.index"
+
+ def cacheTime(path: String): String = s"${path}/${CacheTime}"
+ def lastProcTime(path: String): String = s"${path}/${LastProcTime}"
+ def readyTime(path: String): String = s"${path}/${ReadyTime}"
+ def cleanTime(path: String): String = s"${path}/${CleanTime}"
+ def oldCacheIndex(path: String): String = s"${path}/${OldCacheIndex}"
+
+ val infoPath = "info"
+
+ val finalCacheInfoPath = "info.final"
+ val finalReadyTime = s"${finalCacheInfoPath}/${ReadyTime}"
+ val finalLastProcTime = s"${finalCacheInfoPath}/${LastProcTime}"
+ val finalCleanTime = s"${finalCacheInfoPath}/${CleanTime}"
+
+ def startOffsetCache(): Unit = {
+ genFinalReadyTime
+ }
+
+ def getTimeRange(): (Long, Long) = {
+ readTimeRange
+ }
+
+ def getCleanTime(): Long = {
+ readCleanTime
+ }
+
+ def endOffsetCache: Unit = {
+ genFinalLastProcTime
+ genFinalCleanTime
+ }
+
+ private def genFinalReadyTime(): Unit = {
+ val subPath = listKeys(infoPath)
+ val keys = subPath.map { p => s"${infoPath}/${p}/${ReadyTime}" }
+ val result = read(keys)
+ val times = keys.flatMap { k =>
+ getLongOpt(result, k)
+ }
+ if (times.nonEmpty) {
+ val time = times.min
+ val map = Map[String, String]((finalReadyTime -> time.toString))
+ cache(map)
+ }
+ }
+
+ private def genFinalLastProcTime(): Unit = {
+ val subPath = listKeys(infoPath)
+ val keys = subPath.map { p => s"${infoPath}/${p}/${LastProcTime}" }
+ val result = read(keys)
+ val times = keys.flatMap { k =>
+ getLongOpt(result, k)
+ }
+ if (times.nonEmpty) {
+ val time = times.min
+ val map = Map[String, String]((finalLastProcTime -> time.toString))
+ cache(map)
+ }
+ }
+
+ private def genFinalCleanTime(): Unit = {
+ val subPath = listKeys(infoPath)
+ val keys = subPath.map { p => s"${infoPath}/${p}/${CleanTime}" }
+ val result = read(keys)
+ val times = keys.flatMap { k =>
+ getLongOpt(result, k)
+ }
+ if (times.nonEmpty) {
+ val time = times.min
+ val map = Map[String, String]((finalCleanTime -> time.toString))
+ cache(map)
+ }
+ }
+
+ private def readTimeRange(): (Long, Long) = {
+ val map = read(List(finalLastProcTime, finalReadyTime))
+ val lastProcTime = getLong(map, finalLastProcTime)
+ val curReadyTime = getLong(map, finalReadyTime)
+ (lastProcTime, curReadyTime)
+ }
+
+ private def readCleanTime(): Long = {
+ val map = read(List(finalCleanTime))
+ val cleanTime = getLong(map, finalCleanTime)
+ cleanTime
+ }
+
+ private def getLongOpt(map: Map[String, String], key: String): Option[Long] = {
+ try {
+ map.get(key).map(_.toLong)
+ } catch {
+ case e: Throwable => None
+ }
+ }
+ private def getLong(map: Map[String, String], key: String) = {
+ getLongOpt(map, key).getOrElse(-1L)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
new file mode 100644
index 0000000..8b068e3
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSource.scala
@@ -0,0 +1,96 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataSourceParam
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.context.{DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.connector.DataConnector
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.spark.sql._
+
+/**
+ * data source
+ * @param name name of data source
+ * @param dsParam param of this data source
+ * @param dataConnectors list of data connectors
+ * @param streamingCacheClientOpt streaming data cache client option
+ */
+case class DataSource(name: String,
+ dsParam: DataSourceParam,
+ dataConnectors: Seq[DataConnector],
+ streamingCacheClientOpt: Option[StreamingCacheClient]
+ ) extends Loggable with Serializable {
+
+ def init(): Unit = {
+ dataConnectors.foreach(_.init)
+ }
+
+ def loadData(context: DQContext): TimeRange = {
+ info(s"load data [${name}]")
+ val timestamp = context.contextId.timestamp
+ val (dfOpt, timeRange) = data(timestamp)
+ dfOpt match {
+ case Some(df) => {
+ context.runTimeTableRegister.registerTable(name, df)
+ }
+ case None => {
+ warn(s"load data source [${name}] fails")
+ }
+ }
+ timeRange
+ }
+
+ private def data(timestamp: Long): (Option[DataFrame], TimeRange) = {
+ val batches = dataConnectors.flatMap { dc =>
+ val (dfOpt, timeRange) = dc.data(timestamp)
+ dfOpt match {
+ case Some(df) => Some((dfOpt, timeRange))
+ case _ => None
+ }
+ }
+ val caches = streamingCacheClientOpt match {
+ case Some(dsc) => dsc.readData() :: Nil
+ case _ => Nil
+ }
+ val pairs = batches ++ caches
+
+ if (pairs.size > 0) {
+ pairs.reduce { (a, b) =>
+ (unionDfOpts(a._1, b._1), a._2.merge(b._2))
+ }
+ } else {
+ (None, TimeRange.emptyTimeRange)
+ }
+ }
+
+ def updateData(df: DataFrame): Unit = {
+ streamingCacheClientOpt.foreach(_.updateData(Some(df)))
+ }
+
+ def cleanOldData(): Unit = {
+ streamingCacheClientOpt.foreach(_.cleanOutTimeData)
+ }
+
+ def processFinish(): Unit = {
+ streamingCacheClientOpt.foreach(_.processFinish)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
new file mode 100644
index 0000000..888d92a
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/DataSourceFactory.scala
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataSourceParam
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClientFactory
+import org.apache.griffin.measure.datasource.connector.{DataConnector, DataConnectorFactory}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import scala.util.Success
+
+object DataSourceFactory extends Loggable {
+
+ def getDataSources(sparkSession: SparkSession,
+ ssc: StreamingContext,
+ dataSources: Seq[DataSourceParam]
+ ): Seq[DataSource] = {
+ dataSources.zipWithIndex.flatMap { pair =>
+ val (param, index) = pair
+ getDataSource(sparkSession, ssc, param, index)
+ }
+ }
+
+ private def getDataSource(sparkSession: SparkSession,
+ ssc: StreamingContext,
+ dataSourceParam: DataSourceParam,
+ index: Int
+ ): Option[DataSource] = {
+ val name = dataSourceParam.getName
+ val connectorParams = dataSourceParam.getConnectors
+ val timestampStorage = TimestampStorage()
+
+ // for streaming data cache
+ val streamingCacheClientOpt = StreamingCacheClientFactory.getClientOpt(
+ sparkSession.sqlContext, dataSourceParam.getCacheOpt, name, index, timestampStorage)
+
+ val dataConnectors: Seq[DataConnector] = connectorParams.flatMap { connectorParam =>
+ DataConnectorFactory.getDataConnector(sparkSession, ssc, connectorParam,
+ timestampStorage, streamingCacheClientOpt) match {
+ case Success(connector) => Some(connector)
+ case _ => None
+ }
+ }
+
+ Some(DataSource(name, dataSourceParam, dataConnectors, streamingCacheClientOpt))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
new file mode 100644
index 0000000..a305563
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/TimestampStorage.scala
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource
+
+import org.apache.griffin.measure.Loggable
+
+import scala.collection.mutable.{SortedSet => MutableSortedSet}
+
+/**
+ * tmst cache, CRUD of timestamps
+ */
+case class TimestampStorage() extends Loggable {
+
+ private val tmstGroup: MutableSortedSet[Long] = MutableSortedSet.empty[Long]
+
+ //-- insert tmst into tmst group --
+ def insert(tmst: Long) = tmstGroup += tmst
+ def insert(tmsts: Iterable[Long]) = tmstGroup ++= tmsts
+
+ //-- remove tmst from tmst group --
+ def remove(tmst: Long) = tmstGroup -= tmst
+ def remove(tmsts: Iterable[Long]) = tmstGroup --= tmsts
+
+ //-- get subset of tmst group --
+ def fromUntil(from: Long, until: Long) = tmstGroup.range(from, until).toSet
+ def afterTil(after: Long, til: Long) = tmstGroup.range(after + 1, til + 1).toSet
+ def until(until: Long) = tmstGroup.until(until).toSet
+ def from(from: Long) = tmstGroup.from(from).toSet
+ def all = tmstGroup.toSet
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
new file mode 100644
index 0000000..10c443e
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClient.scala
@@ -0,0 +1,366 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.TimeRange
+import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.utils.DataFrameUtil._
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.griffin.measure.utils.{HdfsUtil, TimeUtil}
+import org.apache.spark.sql._
+
+import scala.util.Random
+
+/**
+ * data source cache in streaming mode
+ * save data frame into hdfs in dump phase
+ * read data frame from hdfs in calculate phase
+ * with update and clean actions for the cache data
+ */
+trait StreamingCacheClient extends StreamingOffsetCacheable with WithFanIn[Long] with Loggable with Serializable {
+
+ val sqlContext: SQLContext
+ val param: Map[String, Any]
+ val dsName: String
+ val index: Int
+
+ val timestampStorage: TimestampStorage
+ protected def fromUntilRangeTmsts(from: Long, until: Long) = timestampStorage.fromUntil(from, until)
+ protected def clearTmst(t: Long) = timestampStorage.remove(t)
+ protected def clearTmstsUntil(until: Long) = {
+ val outDateTmsts = timestampStorage.until(until)
+ timestampStorage.remove(outDateTmsts)
+ }
+ protected def afterTilRangeTmsts(after: Long, til: Long) = fromUntilRangeTmsts(after + 1, til + 1)
+ protected def clearTmstsTil(til: Long) = clearTmstsUntil(til + 1)
+
+ val _FilePath = "file.path"
+ val _InfoPath = "info.path"
+ val _ReadyTimeInterval = "ready.time.interval"
+ val _ReadyTimeDelay = "ready.time.delay"
+ val _TimeRange = "time.range"
+
+ val rdmStr = Random.alphanumeric.take(10).mkString
+ val defFilePath = s"hdfs:///griffin/cache/${dsName}_${rdmStr}"
+ val defInfoPath = s"${index}"
+
+ val filePath: String = param.getString(_FilePath, defFilePath)
+ val cacheInfoPath: String = param.getString(_InfoPath, defInfoPath)
+ val readyTimeInterval: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeInterval, "1m")).getOrElse(60000L)
+ val readyTimeDelay: Long = TimeUtil.milliseconds(param.getString(_ReadyTimeDelay, "1m")).getOrElse(60000L)
+ val deltaTimeRange: (Long, Long) = {
+ def negative(n: Long): Long = if (n <= 0) n else 0
+ param.get(_TimeRange) match {
+ case Some(seq: Seq[String]) => {
+ val nseq = seq.flatMap(TimeUtil.milliseconds(_))
+ val ns = negative(nseq.headOption.getOrElse(0))
+ val ne = negative(nseq.tail.headOption.getOrElse(0))
+ (ns, ne)
+ }
+ case _ => (0, 0)
+ }
+ }
+
+ val _ReadOnly = "read.only"
+ val readOnly = param.getBoolean(_ReadOnly, false)
+
+ val _Updatable = "updatable"
+ val updatable = param.getBoolean(_Updatable, false)
+
+ val newCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.new")
+ val oldCacheLock = OffsetCacheClient.genLock(s"${cacheInfoPath}.old")
+
+ val newFilePath = s"${filePath}/new"
+ val oldFilePath = s"${filePath}/old"
+
+ val defOldCacheIndex = 0L
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame
+
+ /**
+ * save data frame in dump phase
+ * @param dfOpt data frame to be saved
+ * @param ms timestamp of this data frame
+ */
+ def saveData(dfOpt: Option[DataFrame], ms: Long): Unit = {
+ if (!readOnly) {
+ dfOpt match {
+ case Some(df) => {
+ // cache df
+ df.cache
+
+ // cache df
+ val cnt = df.count
+ info(s"save ${dsName} data count: ${cnt}")
+
+ if (cnt > 0) {
+ // lock makes it safer when writing new cache data
+ val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (newCacheLocked) {
+ try {
+ val dfw = df.write.mode(SaveMode.Append).partitionBy(ConstantColumns.tmst)
+ writeDataFrame(dfw, newFilePath)
+ } catch {
+ case e: Throwable => error(s"save data error: ${e.getMessage}")
+ } finally {
+ newCacheLock.unlock()
+ }
+ }
+ }
+
+ // uncache df
+ df.unpersist
+ }
+ case _ => {
+ info(s"no data frame to save")
+ }
+ }
+
+ // submit cache time and ready time
+ if (fanIncrement(ms)) {
+ info(s"save data [${ms}] finish")
+ submitCacheTime(ms)
+ submitReadyTime(ms)
+ }
+
+ }
+ }
+
+ /**
+ * read data frame in calculation phase
+ * @return data frame to calculate, with the time range of data
+ */
+ def readData(): (Option[DataFrame], TimeRange) = {
+ // time range: (a, b]
+ val timeRange = OffsetCacheClient.getTimeRange
+ val reviseTimeRange = (timeRange._1 + deltaTimeRange._1, timeRange._2 + deltaTimeRange._2)
+
+ // read partition info
+ val filterStr = if (reviseTimeRange._1 == reviseTimeRange._2) {
+ info(s"read time range: [${reviseTimeRange._1}]")
+ s"`${ConstantColumns.tmst}` = ${reviseTimeRange._1}"
+ } else {
+ info(s"read time range: (${reviseTimeRange._1}, ${reviseTimeRange._2}]")
+ s"`${ConstantColumns.tmst}` > ${reviseTimeRange._1} AND `${ConstantColumns.tmst}` <= ${reviseTimeRange._2}"
+ }
+
+ // new cache data
+ val newDfOpt = try {
+ val dfr = sqlContext.read
+ Some(readDataFrame(dfr, newFilePath).filter(filterStr))
+ } catch {
+ case e: Throwable => {
+ warn(s"read data source cache warn: ${e.getMessage}")
+ None
+ }
+ }
+
+ // old cache data
+ val oldCacheIndexOpt = if (updatable) readOldCacheIndex else None
+ val oldDfOpt = oldCacheIndexOpt.flatMap { idx =>
+ val oldDfPath = s"${oldFilePath}/${idx}"
+ try {
+ val dfr = sqlContext.read
+ Some(readDataFrame(dfr, oldDfPath).filter(filterStr))
+ } catch {
+ case e: Throwable => {
+ warn(s"read old data source cache warn: ${e.getMessage}")
+ None
+ }
+ }
+ }
+
+ // whole cache data
+ val cacheDfOpt = unionDfOpts(newDfOpt, oldDfOpt)
+
+ // from until tmst range
+ val (from, until) = (reviseTimeRange._1, reviseTimeRange._2)
+ val tmstSet = afterTilRangeTmsts(from, until)
+
+ val retTimeRange = TimeRange(reviseTimeRange, tmstSet)
+ (cacheDfOpt, retTimeRange)
+ }
+
+ private def cleanOutTimePartitions(path: String, outTime: Long, partitionOpt: Option[String],
+ func: (Long, Long) => Boolean
+ ): Unit = {
+ val earlierOrEqPaths = listPartitionsByFunc(path: String, outTime, partitionOpt, func)
+ // delete out time data path
+ earlierOrEqPaths.foreach { path =>
+ info(s"delete hdfs path: ${path}")
+ HdfsUtil.deleteHdfsPath(path)
+ }
+ }
+ private def listPartitionsByFunc(path: String, bound: Long, partitionOpt: Option[String],
+ func: (Long, Long) => Boolean
+ ): Iterable[String] = {
+ val names = HdfsUtil.listSubPathsByType(path, "dir")
+ val regex = partitionOpt match {
+ case Some(partition) => s"^${partition}=(\\d+)$$".r
+ case _ => "^(\\d+)$".r
+ }
+ names.filter { name =>
+ name match {
+ case regex(value) => {
+ str2Long(value) match {
+ case Some(t) => func(t, bound)
+ case _ => false
+ }
+ }
+ case _ => false
+ }
+ }.map(name => s"${path}/${name}")
+ }
+ private def str2Long(str: String): Option[Long] = {
+ try {
+ Some(str.toLong)
+ } catch {
+ case e: Throwable => None
+ }
+ }
+
+ /**
+ * clean out-time cached data on hdfs
+ */
+ def cleanOutTimeData(): Unit = {
+ // clean tmst
+ val cleanTime = readCleanTime
+ cleanTime.foreach(clearTmstsTil(_))
+
+ if (!readOnly) {
+ // new cache data
+ val newCacheCleanTime = if (updatable) readLastProcTime else readCleanTime
+ newCacheCleanTime match {
+ case Some(nct) => {
+ // clean calculated new cache data
+ val newCacheLocked = newCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (newCacheLocked) {
+ try {
+ cleanOutTimePartitions(newFilePath, nct, Some(ConstantColumns.tmst),
+ (a: Long, b: Long) => (a <= b))
+ } catch {
+ case e: Throwable => error(s"clean new cache data error: ${e.getMessage}")
+ } finally {
+ newCacheLock.unlock()
+ }
+ }
+ }
+ case _ => {
+ // do nothing
+ }
+ }
+
+ // old cache data
+ val oldCacheCleanTime = if (updatable) readCleanTime else None
+ oldCacheCleanTime match {
+ case Some(oct) => {
+ val oldCacheIndexOpt = readOldCacheIndex
+ oldCacheIndexOpt.foreach { idx =>
+ val oldDfPath = s"${oldFilePath}/${idx}"
+ val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (oldCacheLocked) {
+ try {
+ // clean calculated old cache data
+ cleanOutTimePartitions(oldFilePath, idx, None, (a: Long, b: Long) => (a < b))
+ // clean out time old cache data not calculated
+// cleanOutTimePartitions(oldDfPath, oct, Some(InternalColumns.tmst))
+ } catch {
+ case e: Throwable => error(s"clean old cache data error: ${e.getMessage}")
+ } finally {
+ oldCacheLock.unlock()
+ }
+ }
+ }
+ }
+ case _ => {
+ // do nothing
+ }
+ }
+ }
+ }
+
+ /**
+ * update old cached data by new data frame
+ * @param dfOpt data frame to update old cached data
+ */
+ def updateData(dfOpt: Option[DataFrame]): Unit = {
+ if (!readOnly && updatable) {
+ dfOpt match {
+ case Some(df) => {
+ // old cache lock
+ val oldCacheLocked = oldCacheLock.lock(-1, TimeUnit.SECONDS)
+ if (oldCacheLocked) {
+ try {
+ val oldCacheIndexOpt = readOldCacheIndex
+ val nextOldCacheIndex = oldCacheIndexOpt.getOrElse(defOldCacheIndex) + 1
+
+ val oldDfPath = s"${oldFilePath}/${nextOldCacheIndex}"
+ val cleanTime = getNextCleanTime
+ val filterStr = s"`${ConstantColumns.tmst}` > ${cleanTime}"
+ val updateDf = df.filter(filterStr)
+
+ val prlCount = sqlContext.sparkContext.defaultParallelism
+ // repartition
+ val repartitionedDf = updateDf.repartition(prlCount)
+ val dfw = repartitionedDf.write.mode(SaveMode.Overwrite)
+ writeDataFrame(dfw, oldDfPath)
+
+ submitOldCacheIndex(nextOldCacheIndex)
+ } catch {
+ case e: Throwable => error(s"update data error: ${e.getMessage}")
+ } finally {
+ oldCacheLock.unlock()
+ }
+ }
+ }
+ case _ => {
+ info(s"no data frame to update")
+ }
+ }
+ }
+ }
+
+ /**
+ * each time calculation phase finishes,
+ * data source cache needs to submit some cache information
+ */
+ def processFinish(): Unit = {
+ // next last proc time
+ val timeRange = OffsetCacheClient.getTimeRange
+ submitLastProcTime(timeRange._2)
+
+ // next clean time
+ val nextCleanTime = timeRange._2 + deltaTimeRange._1
+ submitCleanTime(nextCleanTime)
+ }
+
+ // read next clean time
+ private def getNextCleanTime(): Long = {
+ val timeRange = OffsetCacheClient.getTimeRange
+ val nextCleanTime = timeRange._2 + deltaTimeRange._1
+ nextCleanTime
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
new file mode 100644
index 0000000..d2e7771
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheClientFactory.scala
@@ -0,0 +1,68 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.utils.ParamUtil._
+import org.apache.spark.sql.SQLContext
+
+object StreamingCacheClientFactory extends Loggable {
+
+ private object DataSourceCacheType {
+ val ParquetRegex = "^(?i)parq(uet)?$".r
+ val JsonRegex = "^(?i)json$".r
+ val OrcRegex = "^(?i)orc$".r
+ }
+ import DataSourceCacheType._
+
+ val _type = "type"
+
+ /**
+ * create streaming cache client
+ * @param sqlContext sqlContext in spark environment
+ * @param cacheOpt data source cache config option
+ * @param name data source name
+ * @param index data source index
+ * @param tmstCache the same tmstCache instance inside a data source
+ * @return streaming cache client option
+ */
+ def getClientOpt(sqlContext: SQLContext, cacheOpt: Option[Map[String, Any]],
+ name: String, index: Int, tmstCache: TimestampStorage
+ ): Option[StreamingCacheClient] = {
+ cacheOpt.flatMap { param =>
+ try {
+ val tp = param.getString(_type, "")
+ val dsCache = tp match {
+ case ParquetRegex() => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ case JsonRegex() => StreamingCacheJsonClient(sqlContext, param, name, index, tmstCache)
+ case OrcRegex() => StreamingCacheOrcClient(sqlContext, param, name, index, tmstCache)
+ case _ => StreamingCacheParquetClient(sqlContext, param, name, index, tmstCache)
+ }
+ Some(dsCache)
+ } catch {
+ case e: Throwable => {
+ error(s"generate data source cache fails")
+ None
+ }
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
new file mode 100644
index 0000000..a12ef87
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheJsonClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql._
+
+/**
+ * data source cache in json format
+ */
+case class StreamingCacheJsonClient(sqlContext: SQLContext, param: Map[String, Any],
+ dsName: String, index: Int, timestampStorage: TimestampStorage
+ ) extends StreamingCacheClient {
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+ info(s"write path: ${path}")
+ dfw.json(path)
+ }
+
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+ dfr.json(path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
new file mode 100644
index 0000000..63e7104
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheOrcClient.scala
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql._
+
+/**
+ * data source cache in orc format
+ */
+case class StreamingCacheOrcClient(sqlContext: SQLContext, param: Map[String, Any],
+ dsName: String, index: Int, timestampStorage: TimestampStorage
+ ) extends StreamingCacheClient {
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+ info(s"write path: ${path}")
+ dfw.orc(path)
+ }
+
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+ dfr.orc(path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
new file mode 100644
index 0000000..c275227
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingCacheParquetClient.scala
@@ -0,0 +1,42 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.spark.sql._
+
+/**
+ * data source cache in parquet format
+ */
+case class StreamingCacheParquetClient(sqlContext: SQLContext, param: Map[String, Any],
+ dsName: String, index: Int, timestampStorage: TimestampStorage
+ ) extends StreamingCacheClient {
+
+ sqlContext.sparkContext.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")
+
+ protected def writeDataFrame(dfw: DataFrameWriter[Row], path: String): Unit = {
+ info(s"write path: ${path}")
+ dfw.parquet(path)
+ }
+
+ protected def readDataFrame(dfr: DataFrameReader, path: String): DataFrame = {
+ dfr.parquet(path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
new file mode 100644
index 0000000..3195d24
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/StreamingOffsetCacheable.scala
@@ -0,0 +1,88 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.context.streaming.offset.OffsetCacheClient
+
+/**
+ * timestamp offset of streaming data source cache
+ */
+trait StreamingOffsetCacheable extends Loggable with Serializable {
+
+ val cacheInfoPath: String
+ val readyTimeInterval: Long
+ val readyTimeDelay: Long
+
+ def selfCacheInfoPath = s"${OffsetCacheClient.infoPath}/${cacheInfoPath}"
+
+ def selfCacheTime = OffsetCacheClient.cacheTime(selfCacheInfoPath)
+ def selfLastProcTime = OffsetCacheClient.lastProcTime(selfCacheInfoPath)
+ def selfReadyTime = OffsetCacheClient.readyTime(selfCacheInfoPath)
+ def selfCleanTime = OffsetCacheClient.cleanTime(selfCacheInfoPath)
+ def selfOldCacheIndex = OffsetCacheClient.oldCacheIndex(selfCacheInfoPath)
+
+ protected def submitCacheTime(ms: Long): Unit = {
+ val map = Map[String, String]((selfCacheTime -> ms.toString))
+ OffsetCacheClient.cache(map)
+ }
+
+ protected def submitReadyTime(ms: Long): Unit = {
+ val curReadyTime = ms - readyTimeDelay
+ if (curReadyTime % readyTimeInterval == 0) {
+ val map = Map[String, String]((selfReadyTime -> curReadyTime.toString))
+ OffsetCacheClient.cache(map)
+ }
+ }
+
+ protected def submitLastProcTime(ms: Long): Unit = {
+ val map = Map[String, String]((selfLastProcTime -> ms.toString))
+ OffsetCacheClient.cache(map)
+ }
+
+ protected def readLastProcTime(): Option[Long] = readSelfInfo(selfLastProcTime)
+
+ protected def submitCleanTime(ms: Long): Unit = {
+ val cleanTime = genCleanTime(ms)
+ val map = Map[String, String]((selfCleanTime -> cleanTime.toString))
+ OffsetCacheClient.cache(map)
+ }
+
+ protected def genCleanTime(ms: Long): Long = ms
+
+ protected def readCleanTime(): Option[Long] = readSelfInfo(selfCleanTime)
+
+ protected def submitOldCacheIndex(index: Long): Unit = {
+ val map = Map[String, String]((selfOldCacheIndex -> index.toString))
+ OffsetCacheClient.cache(map)
+ }
+
+ def readOldCacheIndex(): Option[Long] = readSelfInfo(selfOldCacheIndex)
+
+ private def readSelfInfo(key: String): Option[Long] = {
+ OffsetCacheClient.read(key :: Nil).get(key).flatMap { v =>
+ try {
+ Some(v.toLong)
+ } catch {
+ case _ => None
+ }
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
new file mode 100644
index 0000000..413b5a2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/cache/WithFanIn.scala
@@ -0,0 +1,69 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.cache
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.concurrent.{TrieMap, Map => ConcMap}
+
+/**
+ * fan in trait, for multiple input and one output
+ * to support multiple parallel data connectors in one data source
+ */
+trait WithFanIn[T] {
+
+ // total input number
+ val totalNum: AtomicInteger = new AtomicInteger(0)
+ // concurrent map of fan in count for each key
+ val fanInCountMap: ConcMap[T, Int] = TrieMap[T, Int]()
+
+ def registerFanIn(): Int = {
+ totalNum.incrementAndGet()
+ }
+
+ /**
+ * increment for a key, to test if all parallel inputs finished
+ * @param key
+ * @return
+ */
+ def fanIncrement(key: T): Boolean = {
+ fanInc(key)
+ fanInCountMap.get(key) match {
+ case Some(n) if (n >= totalNum.get) => {
+ fanInCountMap.remove(key)
+ true
+ }
+ case _ => false
+ }
+ }
+
+ private def fanInc(key: T): Unit = {
+ fanInCountMap.get(key) match {
+ case Some(n) => {
+ val suc = fanInCountMap.replace(key, n, n + 1)
+ if (!suc) fanInc(key)
+ }
+ case _ => {
+ val oldOpt = fanInCountMap.putIfAbsent(key, 1)
+ if (oldOpt.nonEmpty) fanInc(key)
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
new file mode 100644
index 0000000..1abbd30
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnector.scala
@@ -0,0 +1,112 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.enums.{BatchProcessType, DslType, SparkSqlType}
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.context.{ContextId, DQContext, TimeRange}
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.job.builder.DQJobBuilder
+import org.apache.griffin.measure.step.builder.ConstantColumns
+import org.apache.griffin.measure.step.builder.preproc.PreProcRuleParamGenerator
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions._
+
+trait DataConnector extends Loggable with Serializable {
+
+ @transient val sparkSession: SparkSession
+
+ val dcParam: DataConnectorParam
+
+ val id: String = DataConnectorIdGenerator.genId
+ protected def thisName(suffix: String): String = s"this_${suffix}"
+
+ val timestampStorage: TimestampStorage
+ protected def saveTmst(t: Long) = timestampStorage.insert(t)
+ protected def readTmst(t: Long) = timestampStorage.fromUntil(t, t + 1)
+
+ def init(): Unit
+
+ // get data frame in batch mode
+ def data(ms: Long): (Option[DataFrame], TimeRange)
+
+ private def createContext(t: Long): DQContext = {
+ DQContext(ContextId(t, id), id, Nil, Nil, BatchProcessType)(sparkSession)
+ }
+
+ def preProcess(dfOpt: Option[DataFrame], ms: Long): Option[DataFrame] = {
+ // new context
+ val context = createContext(ms)
+
+ val timestamp = context.contextId.timestamp
+ val suffix = context.contextId.id
+ val thisTable = thisName(suffix)
+
+ try {
+ saveTmst(timestamp) // save timestamp
+
+ dfOpt.flatMap { df =>
+ val preProcRules = PreProcRuleParamGenerator.getNewPreProcRules(dcParam.getPreProcRules, suffix)
+
+ // init data
+ context.compileTableRegister.registerTable(thisTable)
+ context.runTimeTableRegister.registerTable(thisTable, df)
+
+ // build job
+ val preprocJob = DQJobBuilder.buildDQJob(context, preProcRules)
+
+ // job execute
+ preprocJob.execute(context)
+
+ // out data
+ val outDf = context.sparkSession.table(s"`${thisTable}`")
+
+ // add tmst column
+ val withTmstDf = outDf.withColumn(ConstantColumns.tmst, lit(timestamp))
+
+ // clean context
+ context.clean()
+
+ Some(withTmstDf)
+ }
+
+ } catch {
+ case e: Throwable => {
+ error(s"pre-process of data connector [${id}] error: ${e.getMessage}")
+ None
+ }
+ }
+ }
+}
+
+object DataConnectorIdGenerator {
+ private val counter: AtomicLong = new AtomicLong(0L)
+ private val head: String = "dc"
+
+ def genId: String = {
+ s"${head}${increment}"
+ }
+
+ private def increment: Long = {
+ counter.incrementAndGet()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/6b389b31/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
----------------------------------------------------------------------
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
new file mode 100644
index 0000000..4edd4d2
--- /dev/null
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/DataConnectorFactory.scala
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.griffin.measure.datasource.connector
+
+import org.apache.griffin.measure.Loggable
+import org.apache.griffin.measure.configuration.params.DataConnectorParam
+import org.apache.griffin.measure.datasource.TimestampStorage
+import org.apache.griffin.measure.datasource.cache.StreamingCacheClient
+import org.apache.griffin.measure.datasource.connector.batch._
+import org.apache.griffin.measure.datasource.connector.streaming._
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.streaming.StreamingContext
+
+import scala.reflect.ClassTag
+import scala.util.Try
+
+object DataConnectorFactory extends Loggable {
+
+ val HiveRegex = """^(?i)hive$""".r
+ val AvroRegex = """^(?i)avro$""".r
+ val TextDirRegex = """^(?i)text-dir$""".r
+
+ val KafkaRegex = """^(?i)kafka$""".r
+
+ /**
+ * create data connector
+ * @param sparkSession spark env
+ * @param ssc spark streaming env
+ * @param dcParam data connector param
+ * @param tmstCache same tmst cache in one data source
+ * @param streamingCacheClientOpt for streaming cache
+ * @return data connector
+ */
+ def getDataConnector(sparkSession: SparkSession,
+ ssc: StreamingContext,
+ dcParam: DataConnectorParam,
+ tmstCache: TimestampStorage,
+ streamingCacheClientOpt: Option[StreamingCacheClient]
+ ): Try[DataConnector] = {
+ val conType = dcParam.getType
+ val version = dcParam.getVersion
+ Try {
+ conType match {
+ case HiveRegex() => HiveBatchDataConnector(sparkSession, dcParam, tmstCache)
+ case AvroRegex() => AvroBatchDataConnector(sparkSession, dcParam, tmstCache)
+ case TextDirRegex() => TextDirBatchDataConnector(sparkSession, dcParam, tmstCache)
+ case KafkaRegex() => {
+ getStreamingDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ }
+ case _ => throw new Exception("connector creation error!")
+ }
+ }
+ }
+
+ private def getStreamingDataConnector(sparkSession: SparkSession,
+ ssc: StreamingContext,
+ dcParam: DataConnectorParam,
+ tmstCache: TimestampStorage,
+ streamingCacheClientOpt: Option[StreamingCacheClient]
+ ): StreamingDataConnector = {
+ if (ssc == null) throw new Exception("streaming context is null!")
+ val conType = dcParam.getType
+ val version = dcParam.getVersion
+ conType match {
+ case KafkaRegex() => getKafkaDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ case _ => throw new Exception("streaming connector creation error!")
+ }
+ }
+
+ private def getKafkaDataConnector(sparkSession: SparkSession,
+ ssc: StreamingContext,
+ dcParam: DataConnectorParam,
+ tmstCache: TimestampStorage,
+ streamingCacheClientOpt: Option[StreamingCacheClient]
+ ): KafkaStreamingDataConnector = {
+ val KeyType = "key.type"
+ val ValueType = "value.type"
+ val config = dcParam.config
+ val keyType = config.getOrElse(KeyType, "java.lang.String").toString
+ val valueType = config.getOrElse(ValueType, "java.lang.String").toString
+ (getClassTag(keyType), getClassTag(valueType)) match {
+ case (ClassTag(k: Class[String]), ClassTag(v: Class[String])) => {
+ KafkaStreamingStringDataConnector(sparkSession, ssc, dcParam, tmstCache, streamingCacheClientOpt)
+ }
+ case _ => {
+ throw new Exception("not supported type kafka data connector")
+ }
+ }
+ }
+
+ private def getClassTag(tp: String): ClassTag[_] = {
+ try {
+ val clazz = Class.forName(tp)
+ ClassTag(clazz)
+ } catch {
+ case e: Throwable => throw e
+ }
+ }
+
+// def filterDataConnectors[T <: DataConnector : ClassTag](connectors: Seq[DataConnector]): Seq[T] = {
+// connectors.flatMap { dc =>
+// dc match {
+// case mdc: T => Some(mdc)
+// case _ => None
+// }
+// }
+// }
+
+}