You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@kyuubi.apache.org by "ulysses-you (via GitHub)" <gi...@apache.org> on 2023/06/01 10:24:04 UTC

[GitHub] [kyuubi] ulysses-you commented on a diff in pull request #4813: Add the fetchorientation parameter to the /v1/operations/:operationId/log interface

ulysses-you commented on code in PR #4813:
URL: https://github.com/apache/kyuubi/pull/4813#discussion_r1212931337


##########
kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala:
##########
@@ -140,11 +140,13 @@ private[v1] class OperationsResource extends ApiRequestContext with Logging {
   @Path("{operationHandle}/log")
   def getOperationLog(
       @PathParam("operationHandle") operationHandleStr: String,
-      @QueryParam("maxrows") maxRows: Int): OperationLog = {
+      @QueryParam("maxrows") @DefaultValue("100") maxRows: Int,
+      @QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
+      fetchOrientation: String): OperationLog = {
     try {
       val rowSet = fe.be.sessionManager.operationManager.getOperationLogRowSet(
         OperationHandle(operationHandleStr),
-        FetchOrientation.FETCH_NEXT,
+        FetchOrientation.withName(fetchOrientation),

Review Comment:
   can we fast fail there that throw exception if user requests `FETCH_PRIOR` ?



##########
kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala:
##########
@@ -154,14 +167,25 @@ class OperationLog(path: Path) {
     tRow
   }
 
+  def read(maxRows: Int): TRowSet = synchronized {
+    read(FETCH_NEXT, maxRows)
+  }
+
   /**
    * Read to log file line by line
    *
    * @param maxRows maximum result number can reach
+   * @param order   the  fetch orientation of the result, can be FETCH_NEXT, FETCH_FIRST
    */
-  def read(maxRows: Int): TRowSet = synchronized {
+  def read(order: FetchOrientation = FETCH_NEXT, maxRows: Int): TRowSet = synchronized {
     if (!initialized) return ThriftUtils.newEmptyRowSet
-    val (logs, lines) = readLogs(reader, maxRows, maxRows)
+    if (order != FETCH_NEXT && order != FETCH_FIRST) {
+      throw KyuubiSQLException(s"$order in operation log is not supported")
+    }
+    if (order == FETCH_FIRST) {
+      resetReader()

Review Comment:
   how about just close seekableReader and create a new seekableReader?



##########
kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala:
##########
@@ -172,6 +196,21 @@ class OperationLog(path: Path) {
     toRowSet(logs)
   }
 
+  private def resetReader(): Unit = {
+    lock.synchronized {
+      trySafely {
+        if (reader != null) {
+          reader.close()
+        }
+      }
+      reader = null
+      closeExtraReaders()
+      extraReaders.clear()
+      extraPaths.foreach(path =>
+        extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8))
+    }
+  }
+
   def read(from: Int, size: Int): TRowSet = synchronized {

Review Comment:
   this is used for batch mode, we can make a new pr to support FETCH_FIRST.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@kyuubi.apache.org
For additional commands, e-mail: notifications-help@kyuubi.apache.org