You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@griffin.apache.org by wa...@apache.org on 2020/02/08 08:15:13 UTC

[griffin] branch master updated: [GRIFFIN-322] Add SQL mode for ES connector

This is an automated email from the ASF dual-hosted git repository.

wankun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/griffin.git


The following commit(s) were added to refs/heads/master by this push:
     new 727e961  [GRIFFIN-322] Add SQL mode for ES connector
727e961 is described below

commit 727e9617c9f408f252035246b469788354fe1a69
Author: yuxiaoyu <yu...@bytedance.com>
AuthorDate: Sat Feb 8 16:15:03 2020 +0800

    [GRIFFIN-322] Add SQL mode for ES connector
    
    As  [GRIFFIN-322](https://issues.apache.org/jira/projects/GRIFFIN/issues/GRIFFIN-322?filter=allopenissues) , we want add sql mode for es connector.
    
    **The sql mode would more effective and user-friendly.**
    
    Current mode config:
    {   "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
        "index": "index-xxx",
        "type": "metric",
        "host": "xxxxxxxxxx",
        "port": "xxxx",
        "fields": ["col_a", "col_b", "col_c"],
        "size": 100}
    
    SQL mode config:
    {    "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
         "sql.mode": true,
         "host": "xxxxxxxxxx",
         "port": "xxxx",
         "sql": "select col_a, col_b, col_c from index-xx limit 100"}
    
    Compared with current mode, SQL mode could support other types except number type.
    
    Author: yuxiaoyu <yu...@bytedance.com>
    
    Closes #567 from XiaoyuBD/enrichEsConnectorAddSqlMode.
---
 griffin-doc/measure/measure-configuration-guide.md | 56 ++++++++++++++++++++++
 .../batch/ElasticSearchGriffinDataConnector.scala  | 55 ++++++++++++++++++---
 2 files changed, 105 insertions(+), 6 deletions(-)

diff --git a/griffin-doc/measure/measure-configuration-guide.md b/griffin-doc/measure/measure-configuration-guide.md
index d1a6c36..4d7594c 100644
--- a/griffin-doc/measure/measure-configuration-guide.md
+++ b/griffin-doc/measure/measure-configuration-guide.md
@@ -248,6 +248,62 @@ List of supported data connectors:
  **Note:** User-provided data connector should be present in Spark job's class path, by either providing custom jar with 
 `--jars` parameter to spark-submit or by adding setting `spark.jars` in `spark -> config` section of environment config.  
 
+ ##### For ElasticSearch Custom Data Connectors:
+  - Currently supported SQL mode (for ElasticSearch with sql plugin) and NORMAL mode.
+  - For NORMAL mode, config object supports the following keys,
+  
+ | Name       | Type     | Description                            | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-------------- |
+ | index      | `String` | ElasticSearch index name| default |
+ | type       | `String` | ElasticSearch data type | accuracy |
+ | host       | `String` | ElasticSearch url host | `Empty` |
+ | port       | `String` | ElasticSearch url port | `Empty` |
+ | fields     | `List`   | list of columns | `Empty` |
+ | size       | `Integer`| data size (lines) to load | 100 |
+ | metric.name| `String` | metric name to load | * |
+
+ - Example:
+      ```
+     "connectors": [
+       { 
+         "type": "custom",
+         "config": {
+           "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
+           "index": "test-index-v1",
+           "type": "metric",
+           "host": "test.es-xxx.org",
+           "port": "80",
+           "fields": ["col_a", "col_b", "col_c"],
+           "size": 20
+         }
+       }
+     ]
+      ```
+  - For SQL mode, config object supports the following keys,
+  
+ | Name       | Type     | Description                            | Default Values |
+ |:-----------|:---------|:---------------------------------------|:-------------- |
+ | host       | `String` | ElasticSearch url host | `Empty` |
+ | port       | `String` | ElasticSearch url port | `Empty` |
+ | sql.mode   | `Boolean`| use sql mode | false |
+ | sql        | `String` | ElasticSearch SQL | `Empty` |
+
+ - Example:
+      ```
+     "connectors": [
+       { 
+         "type": "custom",
+         "config": {
+           "class": "org.apache.griffin.measure.datasource.connector.batch.ElasticSearchGriffinDataConnector",
+           "host": "test.es-xxx.org",
+           "port": "80",
+           "sql.mode": true,
+           "sql": "select col_a, col_b, col_c from test-index-v1 limit 20"
+         }
+       }
+     ]
+      ```
+ 
  ##### For File based Data Connectors:
 
  - Currently supported formats like Parquet, ORC, AVRO, Text and Delimited types like CSV, TSV etc.
diff --git a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
index b70ba51..93ccb79 100644
--- a/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
+++ b/measure/src/main/scala/org/apache/griffin/measure/datasource/connector/batch/ElasticSearchGriffinDataConnector.scala
@@ -26,10 +26,11 @@ import scala.collection.mutable.ArrayBuffer
 
 import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpRequestBase}
+import org.apache.http.entity.{ContentType, StringEntity}
 import org.apache.http.impl.client.{BasicResponseHandler, HttpClientBuilder}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, DataFrameReader, Row, SparkSession}
 import org.apache.spark.sql.types.{StructField, StructType}
 
 import org.apache.griffin.measure.configuration.dqdefinition.DataConnectorParam
@@ -53,6 +54,8 @@ case class ElasticSearchGriffinDataConnector(
   val Fields = "fields"
   val Size = "size"
   val MetricName = "metric.name"
+  val Sql = "sql"
+  val SqlMode = "sql.mode"
   val index: String = config.getString(Index, "default")
   val dataType: String = config.getString(Type, "accuracy")
   val metricName: String = config.getString(MetricName, "*")
@@ -60,15 +63,44 @@ case class ElasticSearchGriffinDataConnector(
   val version: String = config.getString(EsVersion, "")
   val port: String = config.getString(Port, "")
   val fields: Seq[String] = config.getStringArr(Fields, Seq[String]())
+  val sql: String = config.getString(Sql, "")
+  val sqlMode: Boolean = config.getBoolean(SqlMode, false)
   val size: Int = config.getInt(Size, 100)
 
   override def data(ms: Long): (Option[DataFrame], TimeRange) = {
+    if (sqlMode) dataBySql(ms) else dataBySearch(ms)
+  }
+
+  def dataBySql(ms: Long): (Option[DataFrame], TimeRange) = {
+    val path: String = s"/_sql?format=csv"
+    info(s"ElasticSearchGriffinDataConnector data : sql: $sql")
+    val dfOpt = try {
+      val answer = httpPost(path, sql)
+      if (answer._1) {
+        import sparkSession.implicits._
+        val rdd: RDD[String] = sparkSession.sparkContext.parallelize(answer._2.lines.toList)
+        val reader: DataFrameReader = sparkSession.read
+        reader.option("header", true).option("inferSchema", true)
+        val df: DataFrame = reader.csv(rdd.toDS())
+        val dfOpt = Some(df)
+        val preDfOpt = preProcess(dfOpt, ms)
+        preDfOpt
+      } else None
+    } catch {
+      case e: Throwable =>
+        error(s"load ES by sql $host:$port $sql  fails: ${e.getMessage}", e)
+        None
+    }
+    val tmsts = readTmst(ms)
+    (dfOpt, TimeRange(ms, tmsts))
+  }
 
+  def dataBySearch(ms: Long): (Option[DataFrame], TimeRange) = {
     val path: String = s"/$index/$dataType/_search?sort=tmst:desc&q=name:$metricName&size=$size"
     info(s"ElasticSearchGriffinDataConnector data : host: $host port: $port path:$path")
 
     val dfOpt = try {
-      val answer = httpRequest(path)
+      val answer = httpGet(path)
       val data = ArrayBuffer[Map[String, Number]]()
 
       if (answer._1) {
@@ -109,19 +141,30 @@ case class ElasticSearchGriffinDataConnector(
     (dfOpt, TimeRange(ms, tmsts))
   }
 
-  def httpRequest(path: String): (Boolean, String) = {
+  def httpGet(path: String): (Boolean, String) = {
+    val url: String = s"$getBaseUrl$path"
+    info(s"url:$url")
+    val uri: URI = new URI(url)
+    val request = new HttpGet(uri)
+    doRequest(request)
+  }
 
+  def httpPost(path: String, body: String): (Boolean, String) = {
     val url: String = s"$getBaseUrl$path"
     info(s"url:$url")
     val uri: URI = new URI(url)
-    val request = new org.apache.http.client.methods.HttpGet(uri)
+    val request = new HttpPost(uri)
+    request.setEntity(new StringEntity(body, ContentType.APPLICATION_JSON))
+    doRequest(request)
+  }
+
+  def doRequest(request: HttpRequestBase): (Boolean, String) = {
     request.addHeader("Content-Type", "application/json")
     request.addHeader("Charset", "UTF-8")
     val client = HttpClientBuilder.create().build()
     val response: CloseableHttpResponse = client.execute(request)
     val handler = new BasicResponseHandler()
     (true, handler.handleResponse(response).trim)
-
   }
 
   def parseString(data: String): JsonNode = {