You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ho...@apache.org on 2022/04/21 01:09:44 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #2253] [Improvement] Trino Engine - Events support
This is an automated email from the ASF dual-hosted git repository.
hongdd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4ec707b74 [KYUUBI #2253] [Improvement] Trino Engine - Events support
4ec707b74 is described below
commit 4ec707b7427e90c9a50e70b78d7b5ccf24884d59
Author: Min Zhao <zh...@163.com>
AuthorDate: Thu Apr 21 09:09:24 2022 +0800
[KYUUBI #2253] [Improvement] Trino Engine - Events support
### _Why are the changes needed?_
add event support for trino engine.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #2401 from zhaomin1423/trino_event1.
Closes #2253
dc3c2c2b [Min Zhao] fix conflict
caec1cfd [Min Zhao] fix conflict
7ca01691 [Min Zhao] add trino event
Authored-by: Min Zhao <zh...@163.com>
Signed-off-by: hongdongdong <ho...@cmss.chinamobile.com>
---
externals/kyuubi-trino-engine/pom.xml | 6 +
.../kyuubi/engine/trino/TrinoSqlEngine.scala | 46 ++++++-
.../engine/trino/event/TrinoEngineEvent.scala | 74 +++++++++++
.../engine/trino/event/TrinoOperationEvent.scala | 79 ++++++++++++
.../engine/trino/event/TrinoSessionEvent.scala | 55 +++++++++
.../handler/TrinoJsonLoggingEventHandler.scala | 29 +++++
.../engine/trino/operation/ExecuteStatement.scala | 4 +
.../engine/trino/session/TrinoSessionImpl.scala | 17 +++
.../engine/trino/event/TrinoSqlEventSuite.scala | 137 +++++++++++++++++++++
9 files changed, 445 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-trino-engine/pom.xml b/externals/kyuubi-trino-engine/pom.xml
index b5ad5267f..9807afc82 100644
--- a/externals/kyuubi-trino-engine/pom.xml
+++ b/externals/kyuubi-trino-engine/pom.xml
@@ -40,6 +40,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-events_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.kyuubi</groupId>
<artifactId>kyuubi-ha_${scala.binary.version}</artifactId>
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
index 1a2534fb7..67eafcfe6 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/TrinoSqlEngine.scala
@@ -17,18 +17,25 @@
package org.apache.kyuubi.engine.trino
+import java.net.InetAddress
import java.util.concurrent.CountDownLatch
+import scala.util.control.NonFatal
+
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.TRINO_ENGINE_SHUTDOWN_PRIORITY
import org.apache.kyuubi.Utils.addShutdownHook
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_EVENT_LOGGERS}
import org.apache.kyuubi.engine.trino.TrinoSqlEngine.countDownLatch
import org.apache.kyuubi.engine.trino.TrinoSqlEngine.currentEngine
+import org.apache.kyuubi.engine.trino.event.TrinoEngineEvent
+import org.apache.kyuubi.engine.trino.event.handler.TrinoJsonLoggingEventHandler
+import org.apache.kyuubi.events.{EventBus, EventLoggerType, KyuubiEvent}
import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ZK_CONN_RETRY_POLICY
import org.apache.kyuubi.ha.client.RetryPolicies
import org.apache.kyuubi.service.Serverable
-import org.apache.kyuubi.util.SignalRegister
+import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
case class TrinoSqlEngine()
extends Serverable("TrinoSQLEngine") {
@@ -56,15 +63,47 @@ object TrinoSqlEngine extends Logging {
private val countDownLatch = new CountDownLatch(1)
val kyuubiConf: KyuubiConf = KyuubiConf()
+ .set(ENGINE_EVENT_LOGGERS.key, "JSON")
var currentEngine: Option[TrinoSqlEngine] = None
def startEngine(): Unit = {
+ try {
+ initLoggerEventHandler(kyuubiConf)
+ } catch {
+ case NonFatal(e) =>
+ warn(s"Failed to initialize Logger EventHandler: ${e.getMessage}", e)
+ }
currentEngine = Some(new TrinoSqlEngine())
currentEngine.foreach { engine =>
engine.initialize(kyuubiConf)
+ EventBus.post(TrinoEngineEvent(engine))
engine.start()
- addShutdownHook(() => engine.stop(), TRINO_ENGINE_SHUTDOWN_PRIORITY + 1)
+ EventBus.post(TrinoEngineEvent(engine))
+ addShutdownHook(
+ () => {
+ engine.stop()
+ val event = TrinoEngineEvent(engine)
+ .copy(endTime = System.currentTimeMillis())
+ EventBus.post(event)
+ },
+ TRINO_ENGINE_SHUTDOWN_PRIORITY + 1)
+ }
+ }
+
+ private def initLoggerEventHandler(conf: KyuubiConf): Unit = {
+ val hadoopConf = KyuubiHadoopUtils.newHadoopConf(conf)
+ conf.get(ENGINE_EVENT_LOGGERS).map(EventLoggerType.withName).foreach {
+ case EventLoggerType.JSON =>
+ val hostName = InetAddress.getLocalHost.getCanonicalHostName
+ val handler = TrinoJsonLoggingEventHandler(
+ s"Trino-$hostName",
+ ENGINE_EVENT_JSON_LOG_PATH,
+ hadoopConf,
+ conf)
+ EventBus.register[KyuubiEvent](handler)
+ case logger =>
+ throw new IllegalArgumentException(s"Unrecognized event logger: $logger")
}
}
@@ -84,6 +123,9 @@ object TrinoSqlEngine extends Logging {
currentEngine.foreach { engine =>
error(t)
engine.stop()
+ val event = TrinoEngineEvent(engine)
+ .copy(endTime = System.currentTimeMillis(), diagnostic = t.getMessage)
+ EventBus.post(event)
}
case t: Throwable => error("Create Trino Engine Failed", t)
}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoEngineEvent.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoEngineEvent.scala
new file mode 100644
index 000000000..7e5936360
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoEngineEvent.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.trino.TrinoSqlEngine
+import org.apache.kyuubi.events.KyuubiEvent
+import org.apache.kyuubi.service.ServiceState
+import org.apache.kyuubi.service.ServiceState.ServiceState
+
+case class TrinoEngineEvent(
+ connectionUrl: String,
+ startTime: Long,
+ endTime: Long,
+ state: ServiceState,
+ diagnostic: String,
+ settings: Map[String, String]) extends KyuubiEvent {
+
+ override def partitions: Seq[(String, String)] = {
+ // before engine is started, the start time is 0L, the partition use current day.
+ if (startTime == 0) {
+ ("day", Utils.getDateFromTimestamp(System.currentTimeMillis())) :: Nil
+ } else {
+ ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+ }
+ }
+
+ override def toString: String = {
+ s"""
+ |TrinoEngineEvent: {
+ |connectionUrl: $connectionUrl,
+ |startTime: $startTime,
+ |endTime: $endTime,
+ |state: $state,
+ |diagnostic: $diagnostic,
+ |settings: ${settings.mkString("<", ",", ">")}
+ |}
+ |""".stripMargin
+ }
+}
+
+object TrinoEngineEvent {
+
+ def apply(engine: TrinoSqlEngine): TrinoEngineEvent = {
+ val connectionUrl =
+ if (engine.getServiceState.equals(ServiceState.LATENT)) {
+ null
+ } else {
+ engine.frontendServices.head.connectionUrl
+ }
+
+ new TrinoEngineEvent(
+ connectionUrl = connectionUrl,
+ startTime = engine.getStartTime,
+ endTime = -1L,
+ state = engine.getServiceState,
+ diagnostic = "",
+ settings = engine.getConf.getAll)
+ }
+}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoOperationEvent.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoOperationEvent.scala
new file mode 100644
index 000000000..ebcd2b3ad
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoOperationEvent.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.engine.trino.operation.TrinoOperation
+import org.apache.kyuubi.events.KyuubiEvent
+
+/**
+ * A [[TrinoOperationEvent()]] used to tracker the lifecycle of an operation
+ * at Trino SQL Engine side.
+ * <ul>
+ * <li>Operation Basis</li>
+ * <li>Operation Live Status</li>
+ * <li>Parent Session Id</li>
+ * </ul>
+ *
+ * @param statementId the unique identifier of a single operation
+ * @param statement the sql that you execute
+ * @param shouldRunAsync the flag indicating whether the query runs synchronously or not
+ * @param state the current operation state
+ * @param eventTime the time when the event created & logged
+ * @param createTime the time for changing to the current operation state
+ * @param startTime the time the query start to time of this operation
+ * @param completeTime time time the query ends
+ * @param exception: caught exception if have
+ * @param sessionId the identifier of the parent session
+ * @param sessionUser the authenticated client user
+ */
+case class TrinoOperationEvent(
+ statementId: String,
+ statement: String,
+ shouldRunAsync: Boolean,
+ state: String,
+ eventTime: Long,
+ createTime: Long,
+ startTime: Long,
+ completeTime: Long,
+ exception: Option[Throwable],
+ sessionId: String,
+ sessionUser: String) extends KyuubiEvent {
+
+ override def partitions: Seq[(String, String)] =
+ ("day", Utils.getDateFromTimestamp(createTime)) :: Nil
+}
+
+object TrinoOperationEvent {
+
+ def apply(operation: TrinoOperation): TrinoOperationEvent = {
+ val session = operation.getSession
+ val status = operation.getStatus
+ TrinoOperationEvent(
+ operation.statementId,
+ operation.redactedStatement,
+ operation.shouldRunAsync,
+ status.state.name(),
+ status.lastModified,
+ status.create,
+ status.start,
+ status.completed,
+ status.exception,
+ session.handle.identifier.toString,
+ session.user)
+ }
+}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoSessionEvent.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoSessionEvent.scala
new file mode 100644
index 000000000..66e6e9767
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/TrinoSessionEvent.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.trino.session.TrinoSessionImpl
+import org.apache.kyuubi.events.KyuubiEvent
+
+case class TrinoSessionEvent(
+ sessionId: String,
+ username: String,
+ ip: String,
+ connectionUrl: String,
+ catalog: String,
+ startTime: Long,
+ var endTime: Long = -1L,
+ var totalOperations: Int = 0) extends KyuubiEvent {
+
+ override def partitions: Seq[(String, String)] =
+ ("day", Utils.getDateFromTimestamp(startTime)) :: Nil
+
+}
+
+object TrinoSessionEvent {
+
+ def apply(session: TrinoSessionImpl): TrinoSessionEvent = {
+ val sessionConf = session.sessionManager.getConf
+ val connectionUrl = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_URL).getOrElse(
+ throw KyuubiSQLException("Trino server url can not be null!"))
+ val catalog = sessionConf.get(KyuubiConf.ENGINE_TRINO_CONNECTION_CATALOG).getOrElse(
+ throw KyuubiSQLException("Trino default catalog can not be null!"))
+ new TrinoSessionEvent(
+ session.handle.identifier.toString,
+ session.user,
+ session.ipAddress,
+ connectionUrl = connectionUrl,
+ catalog = catalog,
+ session.createTime)
+ }
+}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/handler/TrinoJsonLoggingEventHandler.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/handler/TrinoJsonLoggingEventHandler.scala
new file mode 100644
index 000000000..1d00e7bce
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/event/handler/TrinoJsonLoggingEventHandler.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event.handler
+
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.kyuubi.config.{ConfigEntry, KyuubiConf}
+import org.apache.kyuubi.events.handler.JsonLoggingEventHandler
+
+case class TrinoJsonLoggingEventHandler(
+ logName: String,
+ logPath: ConfigEntry[String],
+ hadoopConf: Configuration,
+ kyuubiConf: KyuubiConf)
+ extends JsonLoggingEventHandler(logName, logPath, hadoopConf, kyuubiConf)
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
index 1d7e28536..2f7470713 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/ExecuteStatement.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.{RejectedExecutionException, ScheduledExecutorServic
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.Logging
import org.apache.kyuubi.engine.trino.TrinoStatement
+import org.apache.kyuubi.engine.trino.event.TrinoOperationEvent
+import org.apache.kyuubi.events.EventBus
import org.apache.kyuubi.operation.ArrayFetchIterator
import org.apache.kyuubi.operation.IterableFetchIterator
import org.apache.kyuubi.operation.OperationState
@@ -112,4 +114,6 @@ class ExecuteStatement(
statementTimeoutCleaner = Some(timeoutExecutor)
}
}
+
+ EventBus.post(TrinoOperationEvent(this))
}
diff --git a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
index e53ecec7b..c774a966f 100644
--- a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
+++ b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/session/TrinoSessionImpl.scala
@@ -34,6 +34,9 @@ import org.apache.kyuubi.Utils.currentUser
import org.apache.kyuubi.config.{KyuubiConf, KyuubiReservedKeys}
import org.apache.kyuubi.engine.trino.TrinoConf
import org.apache.kyuubi.engine.trino.TrinoContext
+import org.apache.kyuubi.engine.trino.event.TrinoSessionEvent
+import org.apache.kyuubi.events.EventBus
+import org.apache.kyuubi.operation.{Operation, OperationHandle}
import org.apache.kyuubi.session.AbstractSession
import org.apache.kyuubi.session.SessionManager
@@ -49,6 +52,8 @@ class TrinoSessionImpl(
var trinoContext: TrinoContext = _
private var clientSession: ClientSession = _
+ private val sessionEvent = TrinoSessionEvent(this)
+
override def open(): Unit = {
normalizedConf.foreach {
case ("use:database", database) => clientSession = createClientSession(database)
@@ -63,6 +68,7 @@ class TrinoSessionImpl(
trinoContext = TrinoContext(httpClient, clientSession)
super.open()
+ EventBus.post(sessionEvent)
}
private def createClientSession(schema: String = null): ClientSession = {
@@ -97,4 +103,15 @@ class TrinoSessionImpl(
new Duration(clientRequestTimeout, TimeUnit.MILLISECONDS),
true)
}
+
+ override protected def runOperation(operation: Operation): OperationHandle = {
+ sessionEvent.totalOperations += 1
+ super.runOperation(operation)
+ }
+
+ override def close(): Unit = {
+ sessionEvent.endTime = System.currentTimeMillis()
+ EventBus.post(sessionEvent)
+ super.close()
+ }
}
diff --git a/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/event/TrinoSqlEventSuite.scala b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/event/TrinoSqlEventSuite.scala
new file mode 100644
index 000000000..6071e9a84
--- /dev/null
+++ b/externals/kyuubi-trino-engine/src/test/scala/org/apache/kyuubi/engine/trino/event/TrinoSqlEventSuite.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.engine.trino.event
+
+import java.io.{BufferedReader, InputStreamReader}
+import java.net.InetAddress
+import java.nio.file.Paths
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_EVENT_JSON_LOG_PATH, ENGINE_SHARE_LEVEL, ENGINE_TRINO_CONNECTION_CATALOG}
+import org.apache.kyuubi.engine.trino.WithTrinoEngine
+import org.apache.kyuubi.events.JsonProtocol
+import org.apache.kyuubi.operation.HiveJDBCTestHelper
+import org.apache.kyuubi.service.ServiceState
+
+class TrinoSqlEventSuite extends WithTrinoEngine with HiveJDBCTestHelper {
+
+ private val logRoot = kyuubiConf.get(ENGINE_EVENT_JSON_LOG_PATH)
+ private val currentDate = Utils.getDateFromTimestamp(System.currentTimeMillis())
+ private val hostName = InetAddress.getLocalHost.getCanonicalHostName
+
+ override def withKyuubiConf: Map[String, String] = Map(
+ ENGINE_TRINO_CONNECTION_CATALOG.key -> "memory",
+ ENGINE_SHARE_LEVEL.key -> "SERVER")
+
+ override protected val schema = "default"
+
+ override protected def jdbcUrl: String = getJdbcUrl
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ val fileSystem: FileSystem = FileSystem.get(new Configuration())
+ val logPath = new Path(logRoot)
+ if (fileSystem.exists(logPath)) {
+ fileSystem.delete(logPath, true)
+ }
+ fileSystem.close()
+ }
+
+ test("test engine event logging") {
+ val engineEventPath = Paths.get(
+ logRoot,
+ "trino_engine",
+ s"day=$currentDate",
+ s"Trino-$hostName.json")
+ val fileSystem: FileSystem = FileSystem.get(new Configuration())
+ val fs: FSDataInputStream = fileSystem.open(new Path(engineEventPath.toString))
+ val engineEventReader = new BufferedReader(new InputStreamReader(fs))
+ val initializedEvent =
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoEngineEvent])
+ assert(initializedEvent.asInstanceOf[TrinoEngineEvent].state.equals(ServiceState.INITIALIZED))
+
+ val startedEvent =
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoEngineEvent])
+ assert(startedEvent.asInstanceOf[TrinoEngineEvent].state.equals(ServiceState.STARTED))
+ }
+
+ test("test session event logging") {
+ withJdbcStatement() { statement =>
+ val catalogs = statement.getConnection.getMetaData.getCatalogs
+ assert(catalogs.next())
+ val sessionEventPath = Paths.get(
+ logRoot,
+ "trino_session",
+ s"day=$currentDate",
+ s"Trino-$hostName.json")
+ val fileSystem: FileSystem = FileSystem.get(new Configuration())
+ val fs: FSDataInputStream = fileSystem.open(new Path(sessionEventPath.toString))
+ val engineEventReader = new BufferedReader(new InputStreamReader(fs))
+ val readEvent =
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoSessionEvent])
+ assert(readEvent.isInstanceOf[TrinoSessionEvent])
+ }
+ }
+
+ test("test operation event logging") {
+ withJdbcStatement() { statement =>
+ val createSchemaStatement = "CREATE SCHEMA IF NOT EXISTS memory.test_schema"
+ statement.execute(createSchemaStatement)
+ val createTableStatement = "CREATE TABLE IF NOT EXISTS " +
+ "memory.test_schema.trino_engine_test(id int)"
+ statement.execute(createTableStatement)
+ val insertStatement = "INSERT INTO memory.test_schema.trino_engine_test SELECT 1"
+ statement.execute(insertStatement)
+
+ val operationEventPath = Paths.get(
+ logRoot,
+ "trino_operation",
+ s"day=$currentDate",
+ s"Trino-$hostName.json")
+
+ val fileSystem: FileSystem = FileSystem.get(new Configuration())
+ val fs: FSDataInputStream = fileSystem.open(new Path(operationEventPath.toString))
+ val engineEventReader = new BufferedReader(new InputStreamReader(fs))
+
+ val createSchemaEvent =
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoOperationEvent])
+ assert(createSchemaEvent.isInstanceOf[TrinoOperationEvent])
+ assert(StringUtils.equals(
+ createSchemaEvent.asInstanceOf[TrinoOperationEvent].statement,
+ createSchemaStatement))
+
+ val createTableEvent =
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoOperationEvent])
+ assert(createTableEvent.isInstanceOf[TrinoOperationEvent])
+ assert(StringUtils.equals(
+ createTableEvent.asInstanceOf[TrinoOperationEvent].statement,
+ createTableStatement))
+
+ val insertEvent =
+ JsonProtocol.jsonToEvent(engineEventReader.readLine(), classOf[TrinoOperationEvent])
+ assert(insertEvent.isInstanceOf[TrinoOperationEvent])
+ assert(StringUtils.equals(
+ insertEvent.asInstanceOf[TrinoOperationEvent].statement,
+ insertStatement))
+ }
+ }
+
+}